com.atlassian.confluence.content.render.xhtml.migration.exceptions.UnknownMacroMigrationException: The macro 'next_previous_link3' is unknown.

Performing Real-time and Periodic ETL

Introduction

In many real life scenarios that involve integrated entrprise, data needs to be loaded and moved from one location to another to be processed for future reference. This process is commonly known as Extract, Transform, and Load (ETL). In this tutorial, you can learn how ETL is carried out in real-time and periodically.

Let's consider that the Sweet Factory has shops distributed all over the world. In addition, there are travelling sales people who operate from mobile sweet trucks. Each sales person reports transactions via a system that generates JMS messages. This data needs to be saved in a database so that it can be used later for sales analysis and financial analysis. Later, they need to be moved to other databases depending on the nature of the analysis carried out.

Before you begin:

In this scenario, you need to enrich the information sent by sales people by updating events generated by them based on the records in a MySQL table named TransactionDataDB. You need to download and install MySQL, and create this table before you carry out the tutorial steps.

 Click here for instructions to configure the database table.
  1. Download and install MySQL Server.

  2. Download the MySQL JDBC driver.

  3. Unzip the downloaded MySQL driver zipped archive, and copy the MySQL JDBC driver JAR (mysql-connector-java-x.x.xx-bin.jar) into the <SP_HOME>/lib directory.

  4. Enter the following command in a terminal/command window, where username is the username you want to use to access the databases.
    mysql -u username -p 
  5. When prompted, specify the password you are using to access the databases with the username you specified.
  6. Add the following configurations for three database tables under the Data Sources Configuration section of the <SP_HOME>/conf/editor/deployment.yaml file.

    You need to change the values for the username and password parameters to the username and password that you are using to access the MySQL database.

      - name: TransactionDataDB
        description: Datasource used for Sales Records
        jndiConfig:
          name: jdbc/test
          useJndiReference: true
        definition:
          type: RDBMS
          configuration:
            jdbcUrl: 'jdbc:mysql://localhost:3306/TransactionDataDB?useSSL=false'
            username: root
            password: root
            driverClassName: com.mysql.jdbc.Driver
            maxPoolSize: 50
            idleTimeout: 60000
            connectionTestQuery: SELECT 1
            validationTimeout: 30000
            isAutoCommit: false
    
    
      - name: UserDataDB
        description: Datasource used for User Data
        jndiConfig:
          name: jdbc/test
          useJndiReference: true
        definition:
          type: RDBMS
          configuration:
            jdbcUrl: 'jdbc:mysql://localhost:3306/UserDataDB?useSSL=false'
            username: root
            password: root
            driverClassName: com.mysql.jdbc.Driver
            maxPoolSize: 50
            idleTimeout: 60000
            connectionTestQuery: SELECT 1
            validationTimeout: 30000
            isAutoCommit: false
    
    
      - name: TriggerStateDataDB
        description: Datasource used to store the Last Processed ID
        jndiConfig:
          name: jdbc/test
          useJndiReference: true
        definition:
          type: RDBMS
          configuration:
            jdbcUrl: 'jdbc:mysql://localhost:3306/TriggerStateDB?useSSL=false'
            username: root
            password: root
            driverClassName: com.mysql.jdbc.Driver
            maxPoolSize: 50
            idleTimeout: 60000
            connectionTestQuery: SELECT 1
            validationTimeout: 30000
            isAutoCommit: false
  7. To create three database tables named TransactionDataDB, UserDataDB, and TriggerStateDB, issue the following commands from the terminal.
    • To create the TransactionDataDB table:
      mysql> create database TransactionDataDB;
      mysql> use TransactionDataDB;
      mysql> source <SP_HOME>/wso2/editor/dbscripts/metrics/mysql.sql;
      mysql> grant all on TransactionDataDB.* TO username@localhost identified by "password";
       
    • To create the UserDataDB table:
      mysql> create database UserDataDB;
      mysql> use UserDataDB;
      mysql> source <SP_HOME>/wso2/editor/dbscripts/metrics/mysql.sql;
      mysql> grant all on UserDataDB.* TO username@localhost identified by "password";
       
    • To create the TriggerStateDB table.
      mysql> create database TriggerStateDB;
      mysql> use TrggerStateDB;
      mysql> source <SP_HOME>/wso2/editor/dbscripts/metrics/mysql.sql;
      mysql> grant all on TriggerStateDB.* TO username@localhost identified by "password";


Tutorial steps

This section covers two  inter-relatedscenarios that are simple examples for performing real-time and periodic ETL in real world business scenarios.


Scenario 1: Extract data from JMS, perform a stateless transformation, and load to a database.

Once the head office of the Sweet Factory receive events with information about the sales transactions, that information needs to be cleaned and enriched via a connection with a data store. Then the enriched version of the data needs to be saved in another RDBMS store. 

Let's get started!

  1. First, let's enter the required Siddhi queries to extract and clean data.
    1. The sales transactions reported as JMS messages include the user ID (i.e., the ID of the salesman), the transaction amount and the location. Let's begin by adding the stream definition to capture this information.

      define stream TrasanctionStream (userId long, transactionAmount double, location string);
    2. Before enriching the data for further analysis, it needs to be cleaned. This involves checking for nullvalues,and replacing them with default values. To do this, let's follow the steps below:
      1. The information specified in the select clause given above is taken from the TransactionsStream input stream that you previously defined. To specify this, let's add a from statement as follows.

        from TransactionStream

      2. Let's select the data you need to extract and clean. This includes the user ID, transaction amount and the location. For travellingsales people, specific locations are not registered in the system. As a result, no value is specified in the location attribute of the events generated for some sales actions. To clean the data before enriching it, you need to replace the null value for the location attribute of such events with unknown. This can be achieved via the ifTenElse() function of Siddhi by including it in the select clause as shown below.

        from TransactionStream
        select userId,
        transactionAmount, ifThenElse(location is null, "UNKNOWN", location) as location

      3. Now you can add an output stream to be inferred so that the information extracted and cleaned can be directed to it.

        from TrasanctionStream
        select userId, transactionAmount, ifThenElse(location is null, "UNKNOWN", location) as location
        insert into CleanedTrasanctionStream;

      4. Now you have completed the query for extracting and cleaning the information you need. You will be adding more queries to this Siddhi application in order to enrich and load data as you proceed with this tutorial. Therefore, to make it easy for other users to understand this Siddhi application, let's name this query as CleaningData. The completed query looks as follows.

        @info(name = 'CleaningData')
        from TrasanctionStream
        select userId, transactionAmount, ifThenElse(location is null, "UNKNOWN", location) as location
        insert into CleanedTrasanctionStream;

         The Siddhi application currently looks as follows:

        define stream TrasanctionStream (userId long, transactionAmount double, location string);
        
        @info(name = 'CleaningData')  
        from TrasanctionStream 
        select userId, transactionAmount, ifThenElse(location is null, "UNKNOWN", location) as location 
        insert into CleanedTrasanctionStream;
  2. The information you have extracted and cleaned needs to be enriched. To do this, let's add the required queries as follows.
    1. To enrich the data from the CleanedTransactionsStream stream, events need to be joined with records that are already saved in a database table. To do this, the table needs to be defined within the Siddhi application. Therefore, let's add the table definition as follows with a reference to the database table.

      define stream TrasanctionStream (userId long, transactionAmount double, location string);

      define table UserTable (userId long, firstName string, lastName string);

      @info(name = 'CleaningData')
      from TrasanctionStream
      select userId, transactionAmount, ifThenElse(location is null, "UNKNOWN", location) as location
      insert into CleanedTrasanctionStream;

    2. To use the information in this table, make a reference to the related data source via the @store annotation as follows.

      define stream TrasanctionStream (userId long, transactionAmount double, location string);

      @store(type = 'rdbms', datasource = 'UserDataDB') 
      define table UserTable (userId long, firstName string, lastName string);


      @info(name = 'CleaningData')
      from TrasanctionStream
      select userId, transactionAmount, ifThenElse(location is null, "UNKNOWN", location) as location
      insert into CleanedTrasanctionStream;

      To make it easy for other users to understand the purpose of this table, you can also add a comment for the table as shown below.

      -- Table used to enrich data 
      @store(type = 'rdbms', datasource = 'TransactionDataDB')

      define table UserTable (userId long, firstName string, lastName string);

    3. You have already directed the extracted and cleaned information to the CleanedTransactionsStream stream. Therefore, let's specify that stream as the input stream for this query.

      from CleanedTrasanctionStream

    4. The user IDs of the registered sales people are already saved in a table known as the userTable. To enrich the data, user IDs of the events in the CleanedTransactionsStream stream need to be joined with the user IDs stored in the table. For this, let's add a join query as follows.

      from CleanedTrasanctionStream as c join UserTable as u on c.userId == u.userId

    5. Now, let's include a concatenation to derive a username from the first names and the last names of the salespeople as follows.

      select c.userId, str:concat( u.firstName, " ", u.lastName) as userName,
      transactionAmount,
      location

    6. The enriched data can be directed to another inferred output stream named EnrichedTrasanctionStream.

      from CleanedTrasanctionStream as c join UserTable as u
      on c.userId == u.userId
      select c.userId,
      str:concat( u.firstName, " ", u.lastName) as userName,
      transactionAmount,
      location
      insert into EnrichedTrasanctionStream;

    7. Let's complete the query you created for enriching data by naming it as EnrichData.

      @info(name = 'EnrichData')
      from CleanedTrasanctionStream as c join UserTable as u
      on c.userId == u.userId
      select c.userId,
      str:concat( u.firstName, " ", u.lastName) as userName,
      transactionAmount,
      location
      insert into EnrichedTrasanctionStream;

      Now the partially completed Siddhi application looks as follows:

      define stream TrasanctionStream (userId long, transactionAmount double, location string);
      
      -- Table used to enrich data 
      @store(type = 'rdbms', datasource = 'UserDataDB') 
      define table UserTable (userId long, firstName string, lastName string);
      
      @info(name = 'CleaningData')  
      from TrasanctionStream 
      select userId, transactionAmount, ifThenElse(location is null, "UNKNOWN", location) as location 
      insert into CleanedTrasanctionStream;
      
      @info(name = 'EnrichData')  
      from CleanedTrasanctionStream as c join UserTable as u 
          on c.userId == u.userId 
      select c.userId,  
             str:concat( u.firstName, " ", u.lastName) as userName,  
             transactionAmount,  
             location 
      insert into EnrichedTrasanctionStream;
  3. Now you have done the required configurations to extract data from JMS and enrich it. To save the enriched data in another database table, lets follow the steps given below.

    1. Each enriched record stored should include the user ID, transaction amount and the location. Let's add the definition of the table to store the enriched data as shown below.

      -- Final table to load the data
                
      @store(type = 'rdbms' , datasource = 'TransactionDataDB')
      define table TrasanctionTable (userId long, transactionAmount double, location string);

    2. To insert the enriched data to the table you created, let's add another Siddhi query as follows. This query takes the enriched events from the EnrichedTransactionsStream inferred stream (which receives the enriched data), and inserts them into the TransactionTable table you defined.

      @info(name = 'LoadData')
      from EnrichedTrasanctionStream
      insert into TransactionTable;
  4. The transactions need to be generated via JMS in JSON format. Therefore, let's add an event sink with JMS as the transport and JSON as the format. You can add this above the stream definition as shown below.

    @sink(type = 'jms' , destination = 'transactions',
    factory.initial = '...', provider.url = '...',
    @map(type = 'json',
    @attributes('$.userId', '$.transactionAmount', '$.location')))

    define stream TrasanctionStream (userId long, transactionAmount double, location string);

Let's name this Siddhi application as JMS-to-DB-ETL. You can optionally add a description too. The complete Siddhi Application looks as follows:

@App:name("JMS-to-DB-ETL")
@App:description("Extract data from JMS, perform stateless transformation, and load to database")
 
-- Sample input message: {“userId”:2345, 
-- “transactionAmount”:456.0, 
-- “location”: "CA, USA”}

@sink(type = 'jms' , destination = 'transactions', 
   factory.initial = '...', provider.url = '...',
   @map(type = 'json',
      @attributes('$.userId', '$.transactionAmount', '$.location')))
 
define stream TrasanctionStream (userId long, 
                        transactionAmount double, location string);
 
-- Table used to enrich data
@store(type = 'rdbms', datasource = 'UserDataDB')
define table UserTable (userId long, firstName string, 
                        lastName string);
 
-- Final table to load the data
@store(type = 'rdbms' , datasource = 'TransactionDataDB')
define stream TrasanctionTable (userId long, 
                        transactionAmount double, location string);
 
@info(name = 'CleaningData')
from TrasanctionStream
select userId, 
       transactionAmount, 
       ifThenElse(location is null, "UNKNOWN", location) as location
insert into CleanedTrasanctionStream;
 
@info(name = 'EnrichData')
from CleanedTrasanctionStream as c join UserTable as u
   on c.userId == u.userId
select c.userId, 
       str:concat( u.firstName, " ", u.lastName) as userName, 
       transactionAmount, 
       location
insert into EnrichedTrasanctionStream;
 
@info(name = 'LoadData')
from EnrichedTrasanctionStream
insert into TransactionTable;


Scenario 2: Extract data from a database, perform a stateful transformation, and load data to Kafka

Let's consider a scenario where the head office of the Sweet Factory needs to analyze sales by the location based on the latest transactions every five minutes. To do this, you need to poll the database with the sales transactions every five minutes. Before the aggregates are calculated, you also need to ensure that the database contains the latest transactions. You can perform real-time ETL via the WSO2 Stream Processor to achieve this as shown below.

Let's get started!

  1. First, you need to ensure that the information extracted for processing is based on the latest transactions. To do that, let's collect information relating to the latest transactions as follows.
    1. You need a database table that at any given time, contains information about which transactions were last processed. Therefore, let's start by adding the definition of this data table to the Siddhi application as follows.

      -- MySQL Table to keep last processed transaction ID
      @primaryKey('key')
      @store(type = 'rdbms', datasource = 'TriggerStateDB')
      define table TriggerStateTable (key string, lastProcessedId long);

      Here, you are creating the new TriggerStateTable table in the same TransactionDataDB which you created before starting this tutorial and stored data in the previous scenario. The key attribute in the table schema is the primary key, and values for this attribute are the row IDs in the table.

    2. The TriggerStateTable table you created needs to be polled every five minutes for the last processed ID. To do this, you can compare the last processed ID in the table with the last processed ID in a stream in which a new event is triggered every five minutes. To make this possible, let's define a trigger that triggers an event in an inferred stream every five minutes.

      define trigger TriggerStream at every 5 min;

       

      -- MySQL Table to keep last processed transaction ID
      @store(type = 'rdbms', datasource = 'TriggerStateDB')
      define table TriggerStateTable (key string, lastProcessedId long);
    3. To derive the latest transaction by comparing the last processed ID in the TriggerStateTable table and the TriggerStream inferred stream, let's create a Siddhi query as follows: 
      1. The TriggerStateTable has information about the last processed ID (i.e., the value of the lastProcessedID attribute). To keep polling this table lt's join it with the TriggerStream stream in which events are entered every 5 minutes. The join can be performed as follows.

        from TriggerStream as s right outer join TriggerStateTable as t
      2. The events extracted based on the last processed ID need to be inserted into another stream so that it can be used for further processing. Therefore, let's add the insert into clause with an inferred stream as follows.

        from TriggerStream as s right outer join TriggerStateTable as t
        insert into DataRetrievalStream;

      3. Now you need to specify the condition based on which the last processed ID is selected from the joined TriggerStream stream and the TransactionTable table.

        As mentioned before, the purpose of the TriggerStream stream is to poll the TransactionTable table. Each new event in this stream is a request to poll the table, and once it is polled, the last processed ID derived is selected for further processing. If no value exists in the table for the lastProcessedID attribute (e.g., if the table is new), 0 must be added as the last processed ID by default. To do this, let's add a select clause with an IfThenElse condition as shown below.

        from TriggerStream as s right outer join TriggerStateTable as t
        select ifThenElse(t.lastProcessedId is null, 0l, t.lastProcessedId ) 
        as lastProcessedId 

        insert into DataRetrievalStream;

      The partially completed Siddhi application looks as follows.

      Note that the query is named CollectLastProcessedID. It is recommended to name each query stating the purpose when there are multiple queries in the Siddhi application.

      define trigger TriggerStream at every 5 min; 
       
      -- In-memory Table to keep last processed transaction ID
       @primaryKey('key')
      define table TriggerStateTable (key string, lastProcessedId long);
      
      @info(name = 'CollectLastProcessedId')
      from TriggerStream as s right outer join TriggerStateTable as t
      select ifThenElse(t.lastProcessedId is null, 0l, t.lastProcessedId )
                   as lastProcessedId
      insert into DataRetrievalStream;

  2. To extract the data that needs to be processed, let's further update the Siddhi application as follows.
    1. In the previous scenario, you stored the data to be processed in a table named TransactionTable. To access that data for this scenario, you need to include the same table definition in the Siddhi application that you are creating for this scenario as follows.

      define trigger TriggerStream at every 5 min; -- MySQL Table to keep last processed transaction ID @store(type = 'rdbms' , datasource = 'TriggerStateDB') define table TriggerStateTable (key string, lastProcessedId long); -- Store table @store(type = 'rdbms' , datasource = 'TransactionDataDB') define table TransactionTable(transactionId long, userId long, transactionAmount double, transactionLocation string); @info(name = 'CollectLastProcessedId') from TriggerStream as s right outer join TriggerStateTable as t select ifThenElse(t.lastProcessedId is null, 0l, t.lastProcessedId ) as lastProcessedId insert into DataRetrievalStream;

    2. The data to be extracted for processing needs to be older than the last processed ID. To extract information based on this condition, let's add a Siddhi query named ExtractData as follows:
      1. The last processed transaction ID should be taken from the TransactionTable only if the last processed ID in that table is later than the last processed ID of an event in the DataRetrievalStream stream. Therefore, let's join this stream and table to do this comparison and extract the required events.

        @info(name = 'ExtractData') 
        from DataRetrievalStream as s join TransactionTable as t
        on s.lastProcessedId < t.transactionId

      2. The data extracted needs to be directed to an output stream. Therefore, let's add an output stream named ExtractedDataStream that can be inferred.

        from DataRetrievalStream as s join TransactionTable as t
        on s.lastProcessedId < t.transactionId

        insert into ExtractedDataStream;

      3. Now let's select the information to be inserted into the ExtractedDataStream stream for the extracted events.

        from DataRetrievalStream as s join TransactionTable as t
        on s.lastProcessedId < t.transactionId
        select t.transactionId, t.transactionAmount, t.transactionLocation 
        insert into ExtractedDataStream;

    The partially completed Siddhi application now looks as follows.

    define trigger TriggerStream at every 5 min; 
     
    -- MySQL Table to keep last processed transaction ID
    @store(type = 'rdbms' , datasource = 'TriggerStateDB')
    define table TriggerStateTable (key string, lastProcessedId long);
    
    
    -- Store table
    @store(type = 'rdbms' , datasource = 'TransactionDataDB')
    define table TransactionTable(transactionId long, userId long,
     transactionAmount double, transactionLocation string);
                  
    
    @info(name = 'CollectLastProcessedId')
    from TriggerStream as s right outer join TriggerStateTable as t
    select ifThenElse(t.lastProcessedId is null, 0l, t.lastProcessedId )
                 as lastProcessedId
    insert into DataRetrievalStream;
    
    @info(name = 'ExtractData') 
    from DataRetrievalStream as s join TransactionTable as t 
    on s.lastProcessedId < t.transactionId 
    select t.transactionId, t.transactionAmount, t.transactionLocation 
    insert into ExtractedDataStream;

  3. When the above query is executed, you have the latest transactions extracted and directed to the ExtractedDataStream stream. To indicate that these transactions are already considered for processing and therefore, should not be selected for processing again, you need to update the TriggerStateTable table. To do this, enter a query named UpdateLastProcessedID.

    1. The information with which the TriggerStateTable table is updated is taken from the ExtractedDataStream stream in which the latest transactions aree extracted every 5 minutes. To indicate this, add a from clause to the UpdateLastProcessedID query as follows.

      @info(name='UpdateLastProcessedId')

      from ExtractedDataStream
    2. When selecting records to be updated/inserted to the table, the unique identity by which the records are identified should be the last processed ID. Therefore, let's add a select clause with the lastProcessedID attribute specified as the key. 

      @info(name='UpdateLastProcessedId')

      fromExtractedDataStream
      select "lastProcessedId" as key, transactionId as lastProcessedId 
    3. The event with the last processed ID may already exist in the TriggerStateTable table. In such a scenario, the existing record should be updated with the latest details taken from the stream. If that event does not already exist in the table, it should be inserted as a new record. To indicate this, let's add an update or insert clause as follows.

      @info(name='UpdateLastProcessedId')

      fromExtractedDataStream
      select "lastProcessedId" as key, transactionId as lastProcessedId 
      update or insert into TriggerStateTable 
      on TriggerStateTable.key == "lastProcessedId";

    The partially created Siddhi application now looks as follows:

    define trigger TriggerStream at every 5 min; 
     
    -- MySQL Table to keep last processed transaction ID
    @store(type = 'rdbms' , datasource = 'TriggerStateDB')
    define table TriggerStateTable (key string, lastProcessedId long);
    
    
    -- Store table
    @store(type = 'rdbms' , datasource = 'TransactionDataDB')
    define table TransactionTable(transactionId long, userId long,
                  transactionAmount double, transactionLocation string);
                  
    
    @info(name = 'CollectLastProcessedId')
    from TriggerStream as s right outer join TriggerStateTable as t
    select ifThenElse(t.lastProcessedId is null, 0l, t.lastProcessedId )
                 as lastProcessedId
    insert into DataRetrievalStream;
    
    @info(name = 'ExtractData') 
    from DataRetrievalStream as s join TransactionTable as t 
    on s.lastProcessedId < t.transactionId 
    select t.transactionId, t.transactionAmount, t.transactionLocation 
    insert into ExtractedDataStream;
    
    
    @info(name='UpdateLastProcessedId')
    fromExtractedDataStream
    select "lastProcessedId" as key, transactionId as lastProcessedId 
    update or insert into TriggerStateTable 
    on TriggerStateTable.key == "lastProcessedId";
  4. The enriched data now needs to be transformed via a stateful aggregation. You need to analyze sales the sales for each location by calculating the total sales and the average sales per location every five minutes. Let's create a Siddhi query named StatefulAggregation to carry out this analysis.

    1. The information to be analyzed is taken from the ExtractedDataStream stream, and the results are inserted into an output stream named TransformedDataStream. Therefore, let's add the from and insert into clauses accordingly.

      @info(name = 'StatefulAggregation')
      from ExtractedDataStream
      insert into TransformedDataStream

    2. As mentioned, the calculations need to be done every five minutes in a tumbling manner. Therefore, let's add a time batch window as follows.

      @info(name = 'StatefulAggregation')
      from ExtractedDataStream#window.timeBatch(5 min)
      insert into TransformedDataStream

    3. The attributes in the ExtractedDataStream stream are transactionIDtransactionLocation and transactionAmount. The values for the transactionLocation attribute need to be taken without any further processing. However, you need to derive the total transaction amount and the average transaction amount from the values for the transactionAmount attribute. This can be achieved via the sum() and avg() Siddhi functions. Therefore, let's add the select clause as follows.

      @info(name = 'StatefulAggregation')
      from ExtractedDataStream#window.timeBatch(5 min)

      select transactionLocation,
      sum(transactionAmount) as totalTransactionAmount,
      avg(transactionAmount) as avgTransactionAmount,
      transactionId
      insert into TranformedDataStream;


    4. The analysis is carried out to evaluate the sales by location. Therefore, let's add a group by clause to group the calculations by the location when presenting the results.

      from ExtractedDataStream#window.timeBatch(5 min)
      select transactionLocation,
      sum(transactionAmount) as totalTransactionAmount,
      avg(transactionAmount) as avgTransactionAmount,
      transactionId
      group by transactionLocation
      insert into TranformedDataStream;


    The partially created Siddhi application now looks as follows. 

    define trigger TriggerStream at every 5 min; 
    
    -- In-memory Table to keep last processed transaction ID
     @primaryKey('key')
    define table TriggerStateTable (key string, lastProcessedId long);
    
    
    -- Store table
    @store(type = 'rdbms' , datasource = 'TRANSACTION_DATA_SOURCE')
    define table TransactionTable(transactionId long, userId long,
                  transactionAmount double, transactionLocation string);
    
    
    @info(name = 'CollectLastProcessedId')
    from TriggerStream as s right outer join TriggerStateTable as t
    select ifThenElse(t.lastProcessedId is null, 0l, t.lastProcessedId )
                 as lastProcessedId
    insert into DataRetrievalStream;
    
    @info(name = 'ExtractData') 
    from DataRetrievalStream as s join TransactionTable as t 
    on s.lastProcessedId < t.transactionId 
    select t.transactionId, t.transactionAmount, t.transactionLocation 
    insert into ExtractedDataStream;
    
    @info(name='UpdateLastProcessedId') 
    from ExtractedDataStream 
    select "lastProcessedId" as key, transactionId as lastProcessedId 
    update or insert into TriggerStateTable 
    on TriggerStateTable.key == "lastProcessedId";
    
    from ExtractedDataStream#window.timeBatch(5 min) 
    select transactionLocation,  
            sum(transactionAmount) as totalTransactionAmount,  
            avg(transactionAmount) as avgTransactionAmount,  
            transactionId 
    group by transactionLocation  
    insert into TranformedDataStream;
  5. The information processed via the StatefulAggregation query and directed to the TransformedDataStream stream needs to be published via Kafka in the Binary format. To specify this, lets follow the steps below.

    1. The events published are taken from the TransformedDataStream stream. Therefore, the sink configuration needs to be connected to this stream. To do this, first, let's add a definition for the TransformedDataStream stream which is currently an inferred stream.

      define stream TranformedDataStream(transactionLocation string,
      totalTransactionAmount double, avgTransactionAmount double,
      transactionId long);

    2. Now let's connect a sink definition where the transport type is Kafka and the map type is binary as shown below.

      -- Sink to load the data
      @sink(type='kafka', bootstrap.servers='...',

      topic='...',is.binary.message='...',
      @map(type='xml'))
      define stream TranformedDataStream(transactionLocation string,

      totalTransactionAmount double, avgTransactionAmount double,
      transactionId long);

  6. Before saving the Siddhi application, add a name and a description to the Siddhi application as follows.

    @App:name('DB-to-Kafka-ETL')
    @App:description('Extract data from database, perform stateful transformation, and load data to kafka')
  7. Save the Siddhi application.. The completed version looks as follows.

    @App:name('DB-to-Kafka-ETL')
    @App:description('Extract data from database, perform stateful transformation, and load data to kafka')
    
    define trigger TriggerStream at every 5 min;
    -- Sink to load the data
    @sink(type='kafka', bootstrap.servers='...', 
       topic='...',is.binary.message='...', 
       @map(type='xml'))
    define stream TranformedDataStream(transactionLocation string, 
          totalTransactionAmount double, avgTransactionAmount double, 
          transactionId long);
    
    -- Store table
    @store(type = 'rdbms' , datasource = 'TransactionDataDB')
    define table TransactionTable(transactionId long, userId long, 
                  transactionAmount double, transactionLocation string);
    
    -- MySQL Table to keep last processed transaction ID
    @primaryKey('key')
    define table TriggerStateTable (key string, lastProcessedId long);
    
    @info(name = 'CollectLastProcessedId')
    from TriggerStream as s right outer join TriggerStateTable as t
    select ifThenElse(t.lastProcessedId is null, 0l, t.lastProcessedId ) 
                 as lastProcessedId
    insert into DataRetrievalStream;
    
    @info(name = 'ExtractData')
    from DataRetrievalStream as s join TransactionTable as t
       on s.lastProcessedId < t.transactionId
    select t.transactionId, t.transactionAmount, t.transactionLocation
    insert into ExtractedDataStream;
    
    @info(name='UpdateLastProcessedId')
    from ExtractedDataStream
    select "lastProcessedId" as key, transactionId as lastProcessedId
    update or insert into TriggerStateTable
       on TriggerStateTable.key == "lastProcessedId";
    
    @info(name = 'StatefulAggregation')
    from ExtractedDataStream#window.timeBatch(5 min)
    select transactionLocation, 
           sum(transactionAmount) as totalTransactionAmount, 
           avg(transactionAmount) as avgTransactionAmount, 
           transactionId
    group by transactionLocation
    insert into TranformedDataStream;
com.atlassian.confluence.content.render.xhtml.migration.exceptions.UnknownMacroMigrationException: The macro 'next_previous_links2' is unknown.