Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. First, let's enter the required Siddhi queries to extract and clean data.
    1. The sales transactions reported as JMS messages include the user ID (i.e., the ID of the salesman), the transaction amount and the location. Let's begin by adding the stream definition to capture this information.

      Panel
      define stream TrasanctionStream (userId long, transactionAmount double, location string);
    2. Before enriching the data for further analysis, it needs to be cleaned. This involves checking for nullvalues,and replacing them with default values. To do this, let's follow the steps below:
      1. The information specified in the select clause given above is taken from the TransactionsStream input stream that you previously defined. To specify this, let's add a from statement as follows.

        Panel

        from TransactionStream

      2. Let's select the data you need to extract and clean. This includes the user ID, transaction amount and the location. For travellingsales people, specific locations are not registered in the system. As a result, no value is specified in the location attribute of the events generated for some sales actions. To clean the data before enriching it, you need to replace the null value for the location attribute of such events with unknown. This can be achieved via the ifTenElse() function of Siddhi by including it in the select clause as shown below.

        Panel

        from TransactionStream
        select userId,
        transactionAmount, ifThenElse(location is null, "UNKNOWN", location) as location

      3. Now you can add an output stream to be inferred so that the information extracted and cleaned can be directed to it.

        Panel

        from TrasanctionStream
        select userId, transactionAmount, ifThenElse(location is null, "UNKNOWN", location) as location
        insert into CleanedTrasanctionStream;

      4. Now you have completed the query for extracting and cleaning the information you need. You will be adding more queries to this Siddhi application in order to enrich and load data as you proceed with this tutorial. Therefore, to make it easy for other users to understand this Siddhi application, let's name this query as CleaningData. The completed query looks as follows.

        Panel

        @info(name = 'CleaningData')
        from TrasanctionStream
        select userId, transactionAmount, ifThenElse(location is null, "UNKNOWN", location) as location
        insert into CleanedTrasanctionStream;

         The Siddhi application currently looks as follows:

        Code Block
        languagesql
        define stream TrasanctionStream (userId long, transactionAmount double, location string);
        
        @info(name = 'CleaningData')  
        from TrasanctionStream 
        select userId, transactionAmount, ifThenElse(location is null, "UNKNOWN", location) as location 
        insert into CleanedTrasanctionStream;
  2. The information you have extracted and cleaned needs to be enriched. To do this, let's add the required queries as follows.
    1. To enrich the data from the CleanedTransactionsStream stream, events need to be joined with records that are already saved in a database table. To do this, the table needs to be defined within the Siddhi application. Therefore, let's add the table definition as follows with a reference to the database table.

      Panel

      define stream TrasanctionStream (userId long, transactionAmount double, location string);

      define table UserTable (userId long, firstName string, lastName string);

      @info(name = 'CleaningData')
      from TrasanctionStream
      select userId, transactionAmount, ifThenElse(location is null, "UNKNOWN", location) as location
      insert into CleanedTrasanctionStream;

    2. To use the information in this table, ake make a reference to the related data source via the @store annotation as follows.

      Panel

      define stream TrasanctionStream (userId long, transactionAmount double, location string);

      @store(type = 'rdbms', datasource = 'UserDataDB') 
      define table UserTable (userId long, firstName string, lastName string);


      @info(name = 'CleaningData')
      from TrasanctionStream
      select userId, transactionAmount, ifThenElse(location is null, "UNKNOWN", location) as location
      insert into CleanedTrasanctionStream;

      Tip

      To make it easy for other users to understand the purpose of this table, you can also add a comment for the table as shown below.

      Panel

      -- Table used to enrich data 
      @store(type = 'rdbms', datasource = 'TransactionDataDB')

      define table UserTable (userId long, firstName string, lastName string);

    3. You have already directed the extracted and cleaned information to the CleanedTransactionsStream stream. Therefore, let's specify that stream as the input stream for this query.

      Panel

      from CleanedTrasanctionStream

    4. The user IDs of the registered sales people are already saved in a table known as the userTable. To enrich the data, user IDs of the events in the CleanedTransactionsStream stream need to be joined with the user IDs stored in the table. For this, let's add a join query as follows.

      Panel

      from CleanedTrasanctionStream as c join UserTable as u on c.userId == u.userId

    5. Now, let's include a concatenation to derive a username from the first names and the last names of the salespeople as follows.

      Panel

      select c.userId, str:concat( u.firstName, " ", u.lastName) as userName,
      transactionAmount,
      location

    6. The enriched data can be directed to another inferred output stream named EnrichedTrasanctionStream.

      Panel

      from CleanedTrasanctionStream as c join UserTable as u
      on c.userId == u.userId
      select c.userId,
      str:concat( u.firstName, " ", u.lastName) as userName,
      transactionAmount,
      location
      insert into EnrichedTrasanctionStream;

    7. Let's complete the query you created for enriching data by naming it as EnrichData.

      Panel

      @info(name = 'EnrichData')
      from CleanedTrasanctionStream as c join UserTable as u
      on c.userId == u.userId
      select c.userId,
      str:concat( u.firstName, " ", u.lastName) as userName,
      transactionAmount,
      location
      insert into EnrichedTrasanctionStream;

      Now the partially completed Siddhi application looks as follows:

      Code Block
      define stream TrasanctionStream (userId long, transactionAmount double, location string);
      
      -- Table used to enrich data 
      @store(type = 'rdbms', datasource = 'UserDataDB') 
      define table UserTable (userId long, firstName string, lastName string);
      
      @info(name = 'CleaningData')  
      from TrasanctionStream 
      select userId, transactionAmount, ifThenElse(location is null, "UNKNOWN", location) as location 
      insert into CleanedTrasanctionStream;
      
      @info(name = 'EnrichData')  
      from CleanedTrasanctionStream as c join UserTable as u 
          on c.userId == u.userId 
      select c.userId,  
             str:concat( u.firstName, " ", u.lastName) as userName,  
             transactionAmount,  
             location 
      insert into EnrichedTrasanctionStream;
  3. Now you have done the required configurations to extract data from JMS and enrich it. To save the enriched data in another database table, lets follow the steps given below.

    1. Each enriched record stored should include the user ID, transaction amount and the location. Let's add the definition of the table to store the enriched data as shown below.

      Panel
      -- Final table to load the data
                
      @store(type = 'rdbms' , datasource = 'TransactionDataDB')
      define table TrasanctionTable (userId long, transactionAmount double, location string);

    2. To insert the enriched data to the table you created, let's add another Siddhi query as follows. This query takes the enriched events from the EnrichedTransactionsStream inferred stream (which receives the enriched data), and inserts them into the TransactionTable table you defined.

      Panel
      @info(name = 'LoadData')
      from EnrichedTrasanctionStream
      insert into TransactionTable;
  4. The transactions need to be generated via JMS in JSON format. Therefore, let's add an event sink with JMS as the transport and JSON as the format. You can add this above the stream definition as shown below.

    Panel

    @sink(type = 'jms' , destination = 'transactions',
    factory.initial = '...', provider.url = '...',
    @map(type = 'json',
    @attributes('$.userId', '$.transactionAmount', '$.location')))

    define stream TrasanctionStream (userId long, transactionAmount double, location string);

...