...
- First, let's enter the required Siddhi queries to extract and clean data.
The sales transactions reported as JMS messages include the user ID (i.e., the ID of the salesman), the transaction amount and the location. Let's begin by adding the stream definition to capture this information.
Panel define stream TrasanctionStream (userId long, transactionAmount double, location string);
- Before enriching the data for further analysis, it needs to be cleaned. This involves checking for nullvalues,and replacing them with default values. To do this, let's follow the steps below:
The information specified in the select clause given above is taken from the
TransactionsStream
input stream that you previously defined. To specify this, let's add afrom
statement as follows.Panel from TransactionStream
Let's select the data you need to extract and clean. This includes the user ID, transaction amount and the location. For travellingsales people, specific locations are not registered in the system. As a result, no value is specified in the location attribute of the events generated for some sales actions. To clean the data before enriching it, you need to replace the
null
value for thelocation
attribute of such events withunknown
. This can be achieved via theifTenElse()
function of Siddhi by including it in theselect
clause as shown below.Panel from TransactionStream
select userId,transactionAmount,
ifThenElse(location is null, "UNKNOWN", location) as location
Now you can add an output stream to be inferred so that the information extracted and cleaned can be directed to it.
Panel from TrasanctionStream
select userId,
transactionAmount,
ifThenElse(location is null, "UNKNOWN", location) as location
insert into CleanedTrasanctionStream;
Now you have completed the query for extracting and cleaning the information you need. You will be adding more queries to this Siddhi application in order to enrich and load data as you proceed with this tutorial. Therefore, to make it easy for other users to understand this Siddhi application, let's name this query as
CleaningData
. The completed query looks as follows.Panel @info(name = 'CleaningData')
from TrasanctionStream
select userId,
transactionAmount,
ifThenElse(location is null, "UNKNOWN", location) as location
insert into CleanedTrasanctionStream;
The Siddhi application currently looks as follows:
Code Block language sql define stream TrasanctionStream (userId long, transactionAmount double, location string); @info(name = 'CleaningData') from TrasanctionStream select userId, transactionAmount, ifThenElse(location is null, "UNKNOWN", location) as location insert into CleanedTrasanctionStream;
- The information you have extracted and cleaned needs to be enriched. To do this, let's add the required queries as follows.
To enrich the data from the
CleanedTransactionsStream
stream, events need to be joined with records that are already saved in a database table. To do this, the table needs to be defined within the Siddhi application. Therefore, let's add the table definition as follows with a reference to the database table.Panel define stream TrasanctionStream (userId long, transactionAmount double, location string);
define table UserTable (userId long, firstName string,
lastName string);
@info(name = 'CleaningData')
from TrasanctionStream
select userId, transactionAmount, ifThenElse(location is null, "UNKNOWN", location) as location
insert into CleanedTrasanctionStream;
To use the information in this table, ake make a reference to the related data source via the
@store
annotation as follows.Panel define stream TrasanctionStream (userId long, transactionAmount double, location string);
@store(type = 'rdbms', datasource = 'UserDataDB')
define table UserTable (userId long, firstName string, lastName string);
@info(name = 'CleaningData')
from TrasanctionStream
select userId, transactionAmount, ifThenElse(location is null, "UNKNOWN", location) as location
insert into CleanedTrasanctionStream;
Tip To make it easy for other users to understand the purpose of this table, you can also add a comment for the table as shown below.
Panel -- Table used to enrich data
@store(type = 'rdbms', datasource = 'TransactionDataDB')
define table UserTable (userId long, firstName string,
lastName string);
You have already directed the extracted and cleaned information to the
CleanedTransactionsStream
stream. Therefore, let's specify that stream as the input stream for this query.Panel from CleanedTrasanctionStream
The user IDs of the registered sales people are already saved in a table known as the
userTable
. To enrich the data, user IDs of the events in theCleanedTransactionsStream
stream need to be joined with the user IDs stored in the table. For this, let's add a join query as follows.Panel from CleanedTrasanctionStream as c join UserTable as u
on c.userId == u.userId
Now, let's include a concatenation to derive a username from the first names and the last names of the salespeople as follows.
Panel select c.userId,
str:concat( u.firstName, " ", u.lastName) as userName,
transactionAmount,
location
The enriched data can be directed to another inferred output stream named
EnrichedTrasanctionStream
.Panel from CleanedTrasanctionStream as c join UserTable as u
on c.userId == u.userId
select c.userId,
str:concat( u.firstName, " ", u.lastName) as userName,
transactionAmount,
location
insert into EnrichedTrasanctionStream;
Let's complete the query you created for enriching data by naming it as
EnrichData
.Panel @info(name = 'EnrichData')
from CleanedTrasanctionStream as c join UserTable as u
on c.userId == u.userId
select c.userId,
str:concat( u.firstName, " ", u.lastName) as userName,
transactionAmount,
location
insert into EnrichedTrasanctionStream;
Now the partially completed Siddhi application looks as follows:
Code Block define stream TrasanctionStream (userId long, transactionAmount double, location string); -- Table used to enrich data @store(type = 'rdbms', datasource = 'UserDataDB') define table UserTable (userId long, firstName string, lastName string); @info(name = 'CleaningData') from TrasanctionStream select userId, transactionAmount, ifThenElse(location is null, "UNKNOWN", location) as location insert into CleanedTrasanctionStream; @info(name = 'EnrichData') from CleanedTrasanctionStream as c join UserTable as u on c.userId == u.userId select c.userId, str:concat( u.firstName, " ", u.lastName) as userName, transactionAmount, location insert into EnrichedTrasanctionStream;
Now you have done the required configurations to extract data from JMS and enrich it. To save the enriched data in another database table, lets follow the steps given below.
Each enriched record stored should include the user ID, transaction amount and the location. Let's add the definition of the table to store the enriched data as shown below.
Panel -- Final table to load the data
@store(type = 'rdbms' , datasource = 'TransactionDataDB')
define table TrasanctionTable (userId long, transactionAmount double, location string);To insert the enriched data to the table you created, let's add another Siddhi query as follows. This query takes the enriched events from the
EnrichedTransactionsStream
inferred stream (which receives the enriched data), and inserts them into theTransactionTable
table you defined.Panel @info(name = 'LoadData')
from EnrichedTrasanctionStream
insert into TransactionTable;
The transactions need to be generated via JMS in JSON format. Therefore, let's add an event sink with
JMS
as the transport andJSON
as the format. You can add this above the stream definition as shown below.Panel @sink(type = 'jms' , destination = 'transactions',
factory.initial = '...', provider.url = '...',
@map(type = 'json',
@attributes('$.userId', '$.transactionAmount', '$.location')))
define stream TrasanctionStream (userId long, transactionAmount double, location string);
...
Let's consider a scenario where the head office of the Sweet Factory needs to analyze sales by the location based on the latest transactions every five minutes. To do this, you need to poll the database with the sales transactions every five minutes. Before the aggregates are calculated, you also need to ensure that the database contains the latest transactions. You can perform real-time ETL vial via the WSO2 Stream Processor to achieve this as shown below.
...