Introduction
In many real life scenarios that involve integrated entrprise, data needs to be loaded and moved from one location to another to be processed for future reference. This process is commonly known as Extract, Transform, and Load (ETL). In this tutorial, you can learn how ETL is carried out in real-time and periodically.
Let's consider that the Sweet Factory has shops distributed all over the world. In addition, there are travelling sales people who operate from mobile sweet trucks. Each sales person transactions via a system that generates JMS messages. This data needs to be saved in a database so that it can be used later for sales analysis and financial analysis. Later, they need to be moved to other databases depending on the nature of the analysis carried out.
Tutorial steps
This section covers two scenarios that are simple examples for performing real-time and periodic ETL in real world business scenarios.
Scenario 1: Extract data from JMS, perform a stateless transformation, and load to a database.
Once the head office of the Sweet Factory receive events with information about the sales transactions, that information needs to be cleaned and enriched via a connection with a data store. Then the enriched version of the data needs to be saved in another RDBMS store.
Let's get started!
The sales transactions reported as JMS messages include the user ID (i.e., the ID of the salesman), the transaction amount and the location. Let's begin by adding the stream definition to capture this information.
define stream TrasanctionStream (userId long, transactionAmount double, location string);
- Before enriching the data for further analysis, it needs to be cleaned. This involves checking for null values, and replacing them with default values. To do this, let's follow the steps below:
First, let's select the data you need to extract and clean. This includes the user ID, transaction amount and the location. For travelling sales people, specific locations are not registered in the system. As a result, no value is specified in the location attribute of the events generated for some sales actions. To clean the data before enriching it, you need to replace the
null
value for thelocation
attribute of such events withunknown
. This can be achieved via theifTenElse()
function of Siddhi by including it in theselect
clause as shown below.select userId,
transactionAmount,
ifThenElse(location is null, "UNKNOWN", location) as location
The information specified in the select clause given above is taken from the
TransactionsStream
input stream that you previously defined. To specify this, let's add afrom
statement as follows.from TrasanctionStream
select userId,
transactionAmount,
ifThenElse(location is null, "UNKNOWN", location) as location
Now you can add an output stream to be inferred so that the information extracted and cleaned can be directed to it.
from TrasanctionStream
select userId,
transactionAmount,
ifThenElse(location is null, "UNKNOWN", location) as location
insert into CleanedTrasanctionStream;
Now you have completed the query for extracting and cleaning the information you need. You will be adding more queries to this Siddhi application in order to enrich and load data as you proceed with this tutorial. Therefore, to make it easy for other users to understand this Siddhi application, let's name this query as
CleaningData
. The completed query looks as follows.@info(name = 'CleaningData')
from TrasanctionStream
select userId,
transactionAmount,
ifThenElse(location is null, "UNKNOWN", location) as location
insert into CleanedTrasanctionStream;
- The information you have extracted and cleaned needs to be enriched. To do this, let's create another query named
EnrichData
as follows.You have already directed the extracted and cleaned information to the
CleanedTransactionsStream
stream. Therefore, let's specify that stream as the input stream for this query.from CleanedTrasanctionStream
The user IDs of the registered sales people are already saved in a table known as the
userTable
. To enrich the data, user IDs of the events in theCleanedTransactionsStream
stream need to be joined with the user IDs stored in the table. For this, let's add a join query as follows.from CleanedTrasanctionStream as c join UserTable as u
on c.userId == u.userId
Now, let's include a concatenation to derive a user name ID from the first name and the last name of the sales people as follows.
select c.userId,
str:concat( u.firstName, " ", u.lastName) as userName,
transactionAmount,
location
The enriched data can be directed to another inferred output stream named
EnrichedTrasanctionStream
.from CleanedTrasanctionStream as c join UserTable as u
on c.userId == u.userId
select c.userId,
str:concat( u.firstName, " ", u.lastName) as userName,
transactionAmount,
location
insert into EnrichedTrasanctionStream;
Let's complete the query you created for enriching data by naming it as EnrichData.
@info(name = 'EnrichData')
from CleanedTrasanctionStream as c join UserTable as u
on c.userId == u.userId
select c.userId,
str:concat( u.firstName, " ", u.lastName) as userName,
transactionAmount,
location
insert into EnrichedTrasanctionStream;
In the
EnrichData
Siddhi query you created, you included a join between theCleanedTrasanctionStream
stream and a table namedUserTable
. For this query to be executed, the definitions for both the stream and the table need to be included in the same Siddhi application. You have already added the stream definition. Now let's add the table definition above the Siddhi queries as follows.More table definitions are added as you proceed with the tutorial. Therefore, it is useful to specify the purpose of each table definition via a comment as shown below.
define stream TrasanctionStream (userId long,
transactionAmount double, location string);-- Table used to enrich data
@store(type = 'rdbms', datasource = 'TRANSACTION_DATA_SOURCE')
define table UserTable (userId long, firstName string,
lastName string);
@info(name = 'CleaningData')
from TrasanctionStream
select userId,
transactionAmount,
ifThenElse(location is null, "UNKNOWN", location) as location
insert into CleanedTrasanctionStream;
@info(name = 'EnrichData')
from CleanedTrasanctionStream as c join UserTable as u
on c.userId == u.userId
select c.userId,
str:concat( u.firstName, " ", u.lastName) as userName,
transactionAmount,
location
insert into EnrichedTrasanctionStream;Let's create another table to store the enriched data as shown below.
-- Final table to load the data
@store(type = 'rdbms' , datasource = 'TRANSACTION_DATA_SOURCE')
define stream TrasanctionTable (userId long,
transactionAmount double, location string);
To insert the enriched data to the table you created, let's add another Siddhi Query as follows:
@info(name = 'LoadData')
from EnrichedTrasanctionStream
insert into TransactionTable;The transactions need to be generated via JMS in JSON format. Therefore, let's add an event sink with
JMS
as the transport andJSON
as the format. You can add this above the stream definition as shown below.@sink(type = 'jms' , destination = 'transactions',
factory.initial = '...', provider.url = '...',
@map(type = 'json',
@attributes('$.userId', '$.transactionAmount', '$.location')))
define stream TrasanctionStream (userId long, transactionAmount double, location string);
Let's name this Siddhi application as JMS-to-DB-ETL. You can optionally add a description too. The complete Siddhi Application looks as follows:
@App:name("JMS-to-DB-ETL") @App:description("Extract data from JMS, perform stateless transformation, and load to database") -- Sample input message: {“userId”:2345, -- “transactionAmount”:456.0, -- “location”: "CA, USA”} @sink(type = 'jms' , destination = 'transactions', factory.initial = '...', provider.url = '...', @map(type = 'json', @attributes('$.userId', '$.transactionAmount', '$.location'))) define stream TrasanctionStream (userId long, transactionAmount double, location string); -- Table used to enrich data @store(type = 'rdbms', datasource = 'TRANSACTION_DATA_SOURCE') define table UserTable (userId long, firstName string, lastName string); -- Final table to load the data @store(type = 'rdbms' , datasource = 'TRANSACTION_DATA_SOURCE') define stream TrasanctionTable (userId long, transactionAmount double, location string); @info(name = 'CleaningData') from TrasanctionStream select userId, transactionAmount, ifThenElse(location is null, "UNKNOWN", location) as location insert into CleanedTrasanctionStream; @info(name = 'EnrichData') from CleanedTrasanctionStream as c join UserTable as u on c.userId == u.userId select c.userId, str:concat( u.firstName, " ", u.lastName) as userName, transactionAmount, location insert into EnrichedTrasanctionStream; @info(name = 'LoadData') from EnrichedTrasanctionStream insert into TransactionTable;
Scenario 2: Extract data from a database, perform a stateful transformation, and load data to Kafka
Let's consider a scenario where the head office of the Sweet Factory needs to analyze sales by the location based on the latest transactions every five minutes. To do this, you need to poll the database with the sales transactions every five minutes. Before the aggregates are calculated, you also need to ensure that the database contains the latest transactions. You can perform real-time ETL vial the WSO2 Stream Processor to achieve this as shown below.
Let's get started!
The information you need to transform and load to Kafka needs to be extracted from a datastore. Therefore, let's start by adding the definition of this data table to the Siddhi application as follows.
-- In-memory Table to keep last processed transaction ID
@primaryKey('key')
define table TriggerStateTable (key string, lastProcessedId long);Once the information is extracted and transformed, the transformed data needs to be directed to an output stream. This output stream can be defined as follows.
define stream TranformedDataStream(transactionLocation string,
totalTransactionAmount double, avgTransactionAmount double,
transactionId long);The database table needs to be polled every five minutes for the last processed ID. To this, you can compare the last processed ID with a stream in which a new event is triggered every five minutes. Therefore, let's add a trigger as follows:
define trigger TriggerStream at every 5 min;
define stream TranformedDataStream(transactionLocation string,
totalTransactionAmount double, avgTransactionAmount double,
transactionId long);
-- In-memory Table to keep last processed transaction ID
@primaryKey('key')
define table TriggerStateTable (key string, lastProcessedId long);- To collect the last processed transaction ID, you need to add a Siddhi query as follows.
To extract the last processed ID, the events triggered in the
TriggerStream
stream as well as the events stored in theTriggerStateTable
table need to be considered. Therefore, let's add a Siddhi join to join this stream and the table as follows.from TriggerStream as s right outer join TriggerStateTable as t
The data collected needs to be inserted into another stream so that it can be used for further processing. Therefore, let's add the
insert into
clause with an inferred stream as follows.from TriggerStream as s right outer join TriggerStateTable as t
insert into DataRetrievalStream;
Now you need to specify the condition based on which the last processed ID is selected from the joined
TriggerStream
stream and theTransactionTable
table, and inserted into theDataRetrievalStream
. Let's add it as follows.from TriggerStream as s right outer join TriggerStateTable as t
select ifThenElse(t.lastProcessedId is null, 0l, t.lastProcessedId )
as lastProcessedId
insert into DataRetrievalStream;
Let's define a data store to save the transction data as follows.
-- Store table
@store(type = 'rdbms' , datasource = 'TRANSACTION_DATA_SOURCE')
define table TransactionTable(transactionId long, userId long,
transactionAmount double, transactionLocation string);- Now you have created a query to collect the last processed IDs from a triggered stream and a table. The data is taken from two locations via a join. Therefore, you need to process the data collected from both to extract the last processed ID. To do this, let's create a Siddhi query as follows.
The last processed transaction ID should be taken from the TransactionTableonly if the last processed ID in that table is later than the last processed ID of an event in the
DataRetrievalStream
stream. Therefore, let's join this stream and table so that the last processed ID is selected by comparing events in both.from DataRetrievalStream as s join TransactionTable as t
on s.lastProcessedId < t.transactionIdThe data extracted needs to be directed to an output stream. Therefore, let's add an output stream named
ExtractedDataStream
that can be inferred.from DataRetrievalStream as s join TransactionTable as t
on s.lastProcessedId < t.transactionId
insert into ExtractedDataStream;
Now let's select the information to be inserted into the
ExtractedDataStream
stream from the event that is extracted as the last processed event.from DataRetrievalStream as s join TransactionTable as t
on s.lastProcessedId < t.transactionId
select t.transactionId, t.transactionAmount, t.transactionLocation
insert into ExtractedDataStream;
The completed query looks as follows.
Note that the query is named
ExtractData
. It is recommended to name each query stating the purpose when there are multiple queries in the Siddhi application.@info(name = 'ExtractData')
from DataRetrievalStream as s join TransactionTable as t
on s.lastProcessedId < t.transactionId
select t.transactionId, t.transactionAmount, t.transactionLocation
insert into ExtractedDataStream;
- You need to update the
TriggerStateTable
table with the last processed ID identified after the join between the table and the stream. To do this, let's create a Siddhi query as follows.The information with which the
TriggerStateTable
table is updated is taken from theExtractedDataStream
stream to which the last processed transaction is being sent every five minutes. Therefore, let's add this stream as the input stream.
from ExtractedDataStream
The event with the last processed ID may already exist in the
TriggerStateTable
table. In such a scenario, the existing record should be updated with the latest details taken from the stream. If that event does not already exist in the table, it should be inserted as a new record. To indicate this, let's add anupdate or insert
clause as follows.
from ExtractedDataStream
update or insert into TriggerStateTable
on TriggerStateTable.key == "lastProcessedId";
When selecting records to be updated/inserted to the table, the unique identity by which the records are identified should be the last processed ID. Therefore, let's add a select clause with the
lastProcessedID
attribute specified as the key.from ExtractedDataStream
select "lastProcessedId" as key, transactionId as lastProcessedId
update or insert into TriggerStateTable
on TriggerStateTable.key == "lastProcessedId";
The completed query looks as follows.
@info(name='UpdateLastProcessedId')
from ExtractedDataStream
select "lastProcessedId" as key, transactionId as lastProcessedId
update or insert into TriggerStateTable
on TriggerStateTable.key == "lastProcessedId";
- Now let's perform a stateful aggregation. You need to analyze sales the sales for each location by calculating the total sales and the average sales per location every five minutes. Let's create a Siddhi query to carry out this analysis.
The information to be analyzed is taken from the
ExtractedDataStream
stream, and the results are inserted into an output stream namedTransformedDataStream
. Therefore, let's add thefrom
andinsert into
clauses accordingly.
from ExtractedDataStream
insert into TransformedDataStream
As mentioned, the calculations need to be done every five minutes in a tumbling manner. Therefore, let's add a time batch window as follows.
from ExtractedDataStream#window.timeBatch(5 min)
insert into TransformedDataStream
The attributes in the
ExtractedDataStream
stream aretransactionID
,transactionLocation
andtransactionAmount
. The values for thetransactionLocation
attribute needs to be taken without any further processing. However, you need to derive the total transaction amount and the average transaction amount from the values for thetransactionAmount
attribute. This can be achieved via thesum()
andavg()
Siddhi functions. Therefore, let's add the select clause as follows.
from ExtractedDataStream#window.timeBatch(5 min)
select transactionLocation,
sum(transactionAmount) as totalTransactionAmount,
avg(transactionAmount) as avgTransactionAmount,
transactionId
insert into TranformedDataStream;
The analysis is carried out to evaluate the sales by location. Therefore, let's add a group by clause to group the calculations by the location when presenting the results.
from ExtractedDataStream#window.timeBatch(5 min)
select transactionLocation,
sum(transactionAmount) as totalTransactionAmount,
avg(transactionAmount) as avgTransactionAmount,
transactionId
group by transactionLocation
insert into TranformedDataStream;
Now let's name the query and complete it as follows.
@info(name = 'StatefulAggregation')
from ExtractedDataStream#window.timeBatch(5 min)
select transactionLocation,
sum(transactionAmount) as totalTransactionAmount,
avg(transactionAmount) as avgTransactionAmount,
transactionId
group by transactionLocation
insert into TranformedDataStream;
The processed information needs to be published via Kafka. Therefore, let's add a Kafka sink as shown below.
-- Sink to load the data
@sink(type='kafka', bootstrap.servers='...',
topic='...',is.binary.message='...',
@map(type='xml'))This sink publishes data from the
TransformedDataStream
stream that is added as the output stream to direct the data processed by theStatefulAggregation
Siddhi query. Therefore, let's connect the sink to that stream by adding the stream definition below it as shown below.-- Sink to load the data
@sink(type='kafka', bootstrap.servers='...',
topic='...',is.binary.message='...',
@map(type='xml'))
define stream TranformedDataStream(transactionLocation string,
totalTransactionAmount double, avgTransactionAmount double,
transactionId long);
Now let's name the Siddhi application and complete it. The completed version looks as follows.
@App:name('DB-to-Kafka-ETL') @App:description('Extract data from database, perform stateful transformation, and load data to kafka') define trigger TriggerStream at every 5 min; -- Sink to load the data @sink(type='kafka', bootstrap.servers='...', topic='...',is.binary.message='...', @map(type='xml')) define stream TranformedDataStream(transactionLocation string, totalTransactionAmount double, avgTransactionAmount double, transactionId long); -- Store table @store(type = 'rdbms' , datasource = 'TRANSACTION_DATA_SOURCE') define table TransactionTable(transactionId long, userId long, transactionAmount double, transactionLocation string); -- In-memory Table to keep last processed transaction ID @primaryKey('key') define table TriggerStateTable (key string, lastProcessedId long); @info(name = 'CollectLastProcessedId') from TriggerStream as s right outer join TriggerStateTable as t select ifThenElse(t.lastProcessedId is null, 0l, t.lastProcessedId ) as lastProcessedId insert into DataRetrievalStream; @info(name = 'ExtractData') from DataRetrievalStream as s join TransactionTable as t on s.lastProcessedId < t.transactionId select t.transactionId, t.transactionAmount, t.transactionLocation insert into ExtractedDataStream; @info(name='UpdateLastProcessedId') from ExtractedDataStream select "lastProcessedId" as key, transactionId as lastProcessedId update or insert into TriggerStateTable on TriggerStateTable.key == "lastProcessedId"; @info(name = 'StatefulAggregation') from ExtractedDataStream#window.timeBatch(5 min) select transactionLocation, sum(transactionAmount) as totalTransactionAmount, avg(transactionAmount) as avgTransactionAmount, transactionId group by transactionLocation insert into TranformedDataStream;