Performing Real-time and Periodic ETL
Introduction
In many real life scenarios that involve integrated entrprise, data needs to be loaded and moved from one location to another to be processed for future reference. This process is commonly known as Extract, Transform, and Load (ETL). In this tutorial, you can learn how ETL is carried out in real-time and periodically.
Let's consider that the Sweet Factory has shops distributed all over the world. In addition, there are travelling sales people who operate from mobile sweet trucks. Each sales person reports transactions via a system that generates JMS messages. This data needs to be saved in a database so that it can be used later for sales analysis and financial analysis. Later, they need to be moved to other databases depending on the nature of the analysis carried out.
Before you begin:
In this scenario, you need to enrich the information sent by sales people by updating events generated by them based on the records in a MySQL table named TransactionDataDB. You need to download and install MySQL, and create this table before you carry out the tutorial steps.
Tutorial steps
This section covers two inter-relatedscenarios that are simple examples for performing real-time and periodic ETL in real world business scenarios.
Scenario 1: Extract data from JMS, perform a stateless transformation, and load to a database.
Once the head office of the Sweet Factory receive events with information about the sales transactions, that information needs to be cleaned and enriched via a connection with a data store. Then the enriched version of the data needs to be saved in another RDBMS store.
Let's get started!
- First, let's enter the required Siddhi queries to extract and clean data.
The sales transactions reported as JMS messages include the user ID (i.e., the ID of the salesman), the transaction amount and the location. Let's begin by adding the stream definition to capture this information.
define stream TrasanctionStream (userId long, transactionAmount double, location string);
- Before enriching the data for further analysis, it needs to be cleaned. This involves checking for nullvalues,and replacing them with default values. To do this, let's follow the steps below:
The information specified in the select clause given above is taken from the
TransactionsStreaminput stream that you previously defined. To specify this, let's add afromstatement as follows.from TransactionStreamLet's select the data you need to extract and clean. This includes the user ID, transaction amount and the location. For travellingsales people, specific locations are not registered in the system. As a result, no value is specified in the location attribute of the events generated for some sales actions. To clean the data before enriching it, you need to replace the
nullvalue for thelocationattribute of such events withunknown. This can be achieved via theifTenElse()function of Siddhi by including it in theselectclause as shown below.from TransactionStream
select userId,transactionAmount,ifThenElse(location is null, "UNKNOWN", location) as locationNow you can add an output stream to be inferred so that the information extracted and cleaned can be directed to it.
from TrasanctionStreamselect userId,transactionAmount,ifThenElse(location is null, "UNKNOWN", location) as locationinsert into CleanedTrasanctionStream;Now you have completed the query for extracting and cleaning the information you need. You will be adding more queries to this Siddhi application in order to enrich and load data as you proceed with this tutorial. Therefore, to make it easy for other users to understand this Siddhi application, let's name this query as
CleaningData. The completed query looks as follows.@info(name = 'CleaningData')from TrasanctionStreamselect userId,transactionAmount,ifThenElse(location is null, "UNKNOWN", location) as locationinsert into CleanedTrasanctionStream;The Siddhi application currently looks as follows:
define stream TrasanctionStream (userId long, transactionAmount double, location string); @info(name = 'CleaningData') from TrasanctionStream select userId, transactionAmount, ifThenElse(location is null, "UNKNOWN", location) as location insert into CleanedTrasanctionStream;
- The information you have extracted and cleaned needs to be enriched. To do this, let's add the required queries as follows.
To enrich the data from the
CleanedTransactionsStreamstream, events need to be joined with records that are already saved in a database table. To do this, the table needs to be defined within the Siddhi application. Therefore, let's add the table definition as follows with a reference to the database table.define stream TrasanctionStream (userId long, transactionAmount double, location string);define table UserTable (userId long, firstName string,lastName string);@info(name = 'CleaningData')from TrasanctionStreamselect userId, transactionAmount, ifThenElse(location is null, "UNKNOWN", location) as locationinsert into CleanedTrasanctionStream;To use the information in this table, make a reference to the related data source via the
@storeannotation as follows.define stream TrasanctionStream (userId long, transactionAmount double, location string);@store(type = 'rdbms', datasource = 'UserDataDB')define table UserTable (userId long, firstName string, lastName string);@info(name = 'CleaningData')from TrasanctionStreamselect userId, transactionAmount, ifThenElse(location is null, "UNKNOWN", location) as locationinsert into CleanedTrasanctionStream;To make it easy for other users to understand the purpose of this table, you can also add a comment for the table as shown below.
-- Table used to enrich data@store(type = 'rdbms', datasource = 'TransactionDataDB')define table UserTable (userId long, firstName string,lastName string);You have already directed the extracted and cleaned information to the
CleanedTransactionsStreamstream. Therefore, let's specify that stream as the input stream for this query.from CleanedTrasanctionStreamThe user IDs of the registered sales people are already saved in a table known as the
userTable. To enrich the data, user IDs of the events in theCleanedTransactionsStreamstream need to be joined with the user IDs stored in the table. For this, let's add a join query as follows.from CleanedTrasanctionStream as c join UserTable as uon c.userId == u.userIdNow, let's include a concatenation to derive a username from the first names and the last names of the salespeople as follows.
select c.userId,str:concat( u.firstName, " ", u.lastName) as userName,transactionAmount,locationThe enriched data can be directed to another inferred output stream named
EnrichedTrasanctionStream.from CleanedTrasanctionStream as c join UserTable as uon c.userId == u.userIdselect c.userId,str:concat( u.firstName, " ", u.lastName) as userName,transactionAmount,location
insert into EnrichedTrasanctionStream;Let's complete the query you created for enriching data by naming it as
EnrichData.@info(name = 'EnrichData')from CleanedTrasanctionStream as c join UserTable as uon c.userId == u.userIdselect c.userId,str:concat( u.firstName, " ", u.lastName) as userName,transactionAmount,locationinsert into EnrichedTrasanctionStream;Now the partially completed Siddhi application looks as follows:
define stream TrasanctionStream (userId long, transactionAmount double, location string); -- Table used to enrich data @store(type = 'rdbms', datasource = 'UserDataDB') define table UserTable (userId long, firstName string, lastName string); @info(name = 'CleaningData') from TrasanctionStream select userId, transactionAmount, ifThenElse(location is null, "UNKNOWN", location) as location insert into CleanedTrasanctionStream; @info(name = 'EnrichData') from CleanedTrasanctionStream as c join UserTable as u on c.userId == u.userId select c.userId, str:concat( u.firstName, " ", u.lastName) as userName, transactionAmount, location insert into EnrichedTrasanctionStream;
Now you have done the required configurations to extract data from JMS and enrich it. To save the enriched data in another database table, lets follow the steps given below.
Each enriched record stored should include the user ID, transaction amount and the location. Let's add the definition of the table to store the enriched data as shown below.
-- Final table to load the data
@store(type = 'rdbms' , datasource = 'TransactionDataDB')
define table TrasanctionTable (userId long, transactionAmount double, location string);To insert the enriched data to the table you created, let's add another Siddhi query as follows. This query takes the enriched events from the
EnrichedTransactionsStreaminferred stream (which receives the enriched data), and inserts them into theTransactionTabletable you defined.@info(name = 'LoadData')
from EnrichedTrasanctionStream
insert into TransactionTable;
The transactions need to be generated via JMS in JSON format. Therefore, let's add an event sink with
JMSas the transport andJSONas the format. You can add this above the stream definition as shown below.@sink(type = 'jms' , destination = 'transactions',
factory.initial = '...', provider.url = '...',
@map(type = 'json',
@attributes('$.userId', '$.transactionAmount', '$.location')))define stream TrasanctionStream (userId long, transactionAmount double, location string);
Let's name this Siddhi application as JMS-to-DB-ETL. You can optionally add a description too. The complete Siddhi Application looks as follows:
@App:name("JMS-to-DB-ETL")
@App:description("Extract data from JMS, perform stateless transformation, and load to database")
-- Sample input message: {“userId”:2345,
-- “transactionAmount”:456.0,
-- “location”: "CA, USA”}
@sink(type = 'jms' , destination = 'transactions',
factory.initial = '...', provider.url = '...',
@map(type = 'json',
@attributes('$.userId', '$.transactionAmount', '$.location')))
define stream TrasanctionStream (userId long,
transactionAmount double, location string);
-- Table used to enrich data
@store(type = 'rdbms', datasource = 'UserDataDB')
define table UserTable (userId long, firstName string,
lastName string);
-- Final table to load the data
@store(type = 'rdbms' , datasource = 'TransactionDataDB')
define stream TrasanctionTable (userId long,
transactionAmount double, location string);
@info(name = 'CleaningData')
from TrasanctionStream
select userId,
transactionAmount,
ifThenElse(location is null, "UNKNOWN", location) as location
insert into CleanedTrasanctionStream;
@info(name = 'EnrichData')
from CleanedTrasanctionStream as c join UserTable as u
on c.userId == u.userId
select c.userId,
str:concat( u.firstName, " ", u.lastName) as userName,
transactionAmount,
location
insert into EnrichedTrasanctionStream;
@info(name = 'LoadData')
from EnrichedTrasanctionStream
insert into TransactionTable;
Scenario 2: Extract data from a database, perform a stateful transformation, and load data to Kafka
Let's consider a scenario where the head office of the Sweet Factory needs to analyze sales by the location based on the latest transactions every five minutes. To do this, you need to poll the database with the sales transactions every five minutes. Before the aggregates are calculated, you also need to ensure that the database contains the latest transactions. You can perform real-time ETL vial the WSO2 Stream Processor to achieve this as shown below.
Let's get started!
- First, you need to ensure that the information extracted for processing is based on the latest transactions. To do that, let's collect information relating to the latest transactions as follows.
You need a database table that at any given time, contains information about which transactions were last processed. Therefore, let's start by adding the definition of this data table to the Siddhi application as follows.
-- MySQL Table to keep last processed transaction ID
@primaryKey('key')@store(type = 'rdbms', datasource = 'TriggerStateDB')
define table TriggerStateTable (key string, lastProcessedId long);
Here, you are creating the new
TriggerStateTabletable in the sameTransactionDataDBwhich you created before starting this tutorial and stored data in the previous scenario. Thekeyattribute in the table schema is the primary key, and values for this attribute are the row IDs in the table.The
TriggerStateTabletable you created needs to be polled every five minutes for the last processed ID. To do this, you can compare the last processed ID in the table with the last processed ID in a stream in which a new event is triggered every five minutes. To make this possible, let's define a trigger that triggers an event in an inferred stream every five minutes.define trigger TriggerStream at every 5 min;-- MySQL Table to keep last processed transaction ID@store(type = 'rdbms', datasource = 'TriggerStateDB')
define table TriggerStateTable (key string, lastProcessedId long);
- To derive the latest transaction by comparing the last processed ID in the
TriggerStateTabletable and theTriggerStreaminferred stream, let's create a Siddhi query as follows:The
TriggerStateTablehas information about the last processed ID (i.e., the value of thelastProcessedIDattribute). To keep polling this table lt's join it with theTriggerStreamstream in which events are entered every 5 minutes. The join can be performed as follows.from TriggerStream as s right outer join TriggerStateTable as t
The events extracted based on the last processed ID need to be inserted into another stream so that it can be used for further processing. Therefore, let's add the
insert intoclause with an inferred stream as follows.from TriggerStream as s right outer join TriggerStateTable as tinsert into DataRetrievalStream;Now you need to specify the condition based on which the last processed ID is selected from the joined
TriggerStreamstream and theTransactionTabletable.
As mentioned before, the purpose of theTriggerStreamstream is to poll theTransactionTabletable. Each new event in this stream is a request to poll the table, and once it is polled, the last processed ID derived is selected for further processing. If no value exists in the table for thelastProcessedIDattribute (e.g., if the table is new),0must be added as the last processed ID by default. To do this, let's add a select clause with anIfThenElsecondition as shown below.from TriggerStream as s right outer join TriggerStateTable as tselect ifThenElse(t.lastProcessedId is null, 0l, t.lastProcessedId )as lastProcessedIdinsert into DataRetrievalStream;
The partially completed Siddhi application looks as follows.
Note that the query is named
CollectLastProcessedID. It is recommended to name each query stating the purpose when there are multiple queries in the Siddhi application.define trigger TriggerStream at every 5 min; -- In-memory Table to keep last processed transaction ID @primaryKey('key') define table TriggerStateTable (key string, lastProcessedId long); @info(name = 'CollectLastProcessedId') from TriggerStream as s right outer join TriggerStateTable as t select ifThenElse(t.lastProcessedId is null, 0l, t.lastProcessedId ) as lastProcessedId insert into DataRetrievalStream;
- To extract the data that needs to be processed, let's further update the Siddhi application as follows.
In the previous scenario, you stored the data to be processed in a table named
TransactionTable. To access that data for this scenario, you need to include the same table definition in the Siddhi application that you are creating for this scenario as follows.define trigger TriggerStream at every 5 min; -- MySQL Table to keep last processed transaction ID @store(type = 'rdbms' , datasource = 'TriggerStateDB') define table TriggerStateTable (key string, lastProcessedId long); -- Store table @store(type = 'rdbms' , datasource = 'TransactionDataDB') define table TransactionTable(transactionId long, userId long, transactionAmount double, transactionLocation string); @info(name = 'CollectLastProcessedId') from TriggerStream as s right outer join TriggerStateTable as t select ifThenElse(t.lastProcessedId is null, 0l, t.lastProcessedId ) as lastProcessedId insert into DataRetrievalStream;- The data to be extracted for processing needs to be older than the last processed ID. To extract information based on this condition, let's add a Siddhi query named
ExtractDataas follows:The last processed transaction ID should be taken from the
TransactionTableonly if the last processed ID in that table is later than the last processed ID of an event in theDataRetrievalStreamstream. Therefore, let's join this stream and table to do this comparison and extract the required events.@info(name = 'ExtractData')from DataRetrievalStream as s join TransactionTable as ton s.lastProcessedId < t.transactionIdThe data extracted needs to be directed to an output stream. Therefore, let's add an output stream named
ExtractedDataStreamthat can be inferred.from DataRetrievalStream as s join TransactionTable as ton s.lastProcessedId < t.transactionIdinsert into ExtractedDataStream;Now let's select the information to be inserted into the
ExtractedDataStreamstream for the extracted events.from DataRetrievalStream as s join TransactionTable as ton s.lastProcessedId < t.transactionIdselect t.transactionId, t.transactionAmount, t.transactionLocationinsert into ExtractedDataStream;
The partially completed Siddhi application now looks as follows.
define trigger TriggerStream at every 5 min; -- MySQL Table to keep last processed transaction ID @store(type = 'rdbms' , datasource = 'TriggerStateDB') define table TriggerStateTable (key string, lastProcessedId long); -- Store table @store(type = 'rdbms' , datasource = 'TransactionDataDB') define table TransactionTable(transactionId long, userId long, transactionAmount double, transactionLocation string); @info(name = 'CollectLastProcessedId') from TriggerStream as s right outer join TriggerStateTable as t select ifThenElse(t.lastProcessedId is null, 0l, t.lastProcessedId ) as lastProcessedId insert into DataRetrievalStream; @info(name = 'ExtractData') from DataRetrievalStream as s join TransactionTable as t on s.lastProcessedId < t.transactionId select t.transactionId, t.transactionAmount, t.transactionLocation insert into ExtractedDataStream; When the above query is executed, you have the latest transactions extracted and directed to the
ExtractedDataStreamstream. To indicate that these transactions are already considered for processing and therefore, should not be selected for processing again, you need to update theTriggerStateTabletable. To do this, enter a query namedUpdateLastProcessedID.The information with which the
TriggerStateTabletable is updated is taken from theExtractedDataStreamstream in which the latest transactions aree extracted every 5 minutes. To indicate this, add afromclause to theUpdateLastProcessedIDquery as follows.@info(name='UpdateLastProcessedId')fromExtractedDataStreamWhen selecting records to be updated/inserted to the table, the unique identity by which the records are identified should be the last processed ID. Therefore, let's add a select clause with the
lastProcessedIDattribute specified as the key.@info(name='UpdateLastProcessedId')fromExtractedDataStreamselect "lastProcessedId" as key, transactionId as lastProcessedIdThe event with the last processed ID may already exist in the
TriggerStateTabletable. In such a scenario, the existing record should be updated with the latest details taken from the stream. If that event does not already exist in the table, it should be inserted as a new record. To indicate this, let's add anupdate or insertclause as follows.@info(name='UpdateLastProcessedId')fromExtractedDataStreamselect "lastProcessedId" as key, transactionId as lastProcessedIdupdate or insert into TriggerStateTableon TriggerStateTable.key == "lastProcessedId";
The partially created Siddhi application now looks as follows:
define trigger TriggerStream at every 5 min; -- MySQL Table to keep last processed transaction ID @store(type = 'rdbms' , datasource = 'TriggerStateDB') define table TriggerStateTable (key string, lastProcessedId long); -- Store table @store(type = 'rdbms' , datasource = 'TransactionDataDB') define table TransactionTable(transactionId long, userId long, transactionAmount double, transactionLocation string); @info(name = 'CollectLastProcessedId') from TriggerStream as s right outer join TriggerStateTable as t select ifThenElse(t.lastProcessedId is null, 0l, t.lastProcessedId ) as lastProcessedId insert into DataRetrievalStream; @info(name = 'ExtractData') from DataRetrievalStream as s join TransactionTable as t on s.lastProcessedId < t.transactionId select t.transactionId, t.transactionAmount, t.transactionLocation insert into ExtractedDataStream; @info(name='UpdateLastProcessedId') fromExtractedDataStream select "lastProcessedId" as key, transactionId as lastProcessedId update or insert into TriggerStateTable on TriggerStateTable.key == "lastProcessedId";The enriched data now needs to be transformed via a stateful aggregation. You need to analyze sales the sales for each location by calculating the total sales and the average sales per location every five minutes. Let's create a Siddhi query named
StatefulAggregationto carry out this analysis.The information to be analyzed is taken from the
ExtractedDataStreamstream, and the results are inserted into an output stream namedTransformedDataStream. Therefore, let's add the from and insert into clauses accordingly.@info(name = 'StatefulAggregation')from ExtractedDataStreaminsert into TransformedDataStreamAs mentioned, the calculations need to be done every five minutes in a tumbling manner. Therefore, let's add a time batch window as follows.
@info(name = 'StatefulAggregation')from ExtractedDataStream#window.timeBatch(5 min)insert into TransformedDataStreamThe attributes in the
ExtractedDataStreamstream aretransactionID,transactionLocationandtransactionAmount. The values for thetransactionLocationattribute need to be taken without any further processing. However, you need to derive the total transaction amount and the average transaction amount from the values for thetransactionAmountattribute. This can be achieved via thesum()andavg()Siddhi functions. Therefore, let's add the select clause as follows.@info(name = 'StatefulAggregation')
from ExtractedDataStream#window.timeBatch(5 min)select transactionLocation,
sum(transactionAmount) as totalTransactionAmount,
avg(transactionAmount) as avgTransactionAmount,
transactionIdinsert into TranformedDataStream;The analysis is carried out to evaluate the sales by location. Therefore, let's add a
group byclause to group the calculations by the location when presenting the results.from ExtractedDataStream#window.timeBatch(5 min)select transactionLocation,sum(transactionAmount) as totalTransactionAmount,avg(transactionAmount) as avgTransactionAmount,transactionId
group by transactionLocationinsert into TranformedDataStream;
The partially created Siddhi application now looks as follows.
define trigger TriggerStream at every 5 min; -- In-memory Table to keep last processed transaction ID @primaryKey('key') define table TriggerStateTable (key string, lastProcessedId long); -- Store table @store(type = 'rdbms' , datasource = 'TRANSACTION_DATA_SOURCE') define table TransactionTable(transactionId long, userId long, transactionAmount double, transactionLocation string); @info(name = 'CollectLastProcessedId') from TriggerStream as s right outer join TriggerStateTable as t select ifThenElse(t.lastProcessedId is null, 0l, t.lastProcessedId ) as lastProcessedId insert into DataRetrievalStream; @info(name = 'ExtractData') from DataRetrievalStream as s join TransactionTable as t on s.lastProcessedId < t.transactionId select t.transactionId, t.transactionAmount, t.transactionLocation insert into ExtractedDataStream; @info(name='UpdateLastProcessedId') from ExtractedDataStream select "lastProcessedId" as key, transactionId as lastProcessedId update or insert into TriggerStateTable on TriggerStateTable.key == "lastProcessedId"; from ExtractedDataStream#window.timeBatch(5 min) select transactionLocation, sum(transactionAmount) as totalTransactionAmount, avg(transactionAmount) as avgTransactionAmount, transactionId group by transactionLocation insert into TranformedDataStream;The information processed via the
StatefulAggregationquery and directed to theTransformedDataStreamstream needs to be published via Kafka in the Binary format. To specify this, lets follow the steps below.The events published are taken from the
TransformedDataStreamstream. Therefore, the sink configuration needs to be connected to this stream. To do this, first, let's add a definition for theTransformedDataStreamstream which is currently an inferred stream.define stream TranformedDataStream(transactionLocation string,totalTransactionAmount double, avgTransactionAmount double,transactionId long);Now let's connect a sink definition where the transport type is Kafka and the map type is binary as shown below.
-- Sink to load the data
@sink(type='kafka', bootstrap.servers='...',
topic='...',is.binary.message='...',
@map(type='xml'))
define stream TranformedDataStream(transactionLocation string,totalTransactionAmount double, avgTransactionAmount double,transactionId long);
Before saving the Siddhi application, add a name and a description to the Siddhi application as follows.
@App:name('DB-to-Kafka-ETL') @App:description('Extract data from database, perform stateful transformation, and load data to kafka')Save the Siddhi application.. The completed version looks as follows.
@App:name('DB-to-Kafka-ETL') @App:description('Extract data from database, perform stateful transformation, and load data to kafka') define trigger TriggerStream at every 5 min; -- Sink to load the data @sink(type='kafka', bootstrap.servers='...', topic='...',is.binary.message='...', @map(type='xml')) define stream TranformedDataStream(transactionLocation string, totalTransactionAmount double, avgTransactionAmount double, transactionId long); -- Store table @store(type = 'rdbms' , datasource = 'TransactionDataDB') define table TransactionTable(transactionId long, userId long, transactionAmount double, transactionLocation string); -- MySQL Table to keep last processed transaction ID @primaryKey('key') define table TriggerStateTable (key string, lastProcessedId long); @info(name = 'CollectLastProcessedId') from TriggerStream as s right outer join TriggerStateTable as t select ifThenElse(t.lastProcessedId is null, 0l, t.lastProcessedId ) as lastProcessedId insert into DataRetrievalStream; @info(name = 'ExtractData') from DataRetrievalStream as s join TransactionTable as t on s.lastProcessedId < t.transactionId select t.transactionId, t.transactionAmount, t.transactionLocation insert into ExtractedDataStream; @info(name='UpdateLastProcessedId') from ExtractedDataStream select "lastProcessedId" as key, transactionId as lastProcessedId update or insert into TriggerStateTable on TriggerStateTable.key == "lastProcessedId"; @info(name = 'StatefulAggregation') from ExtractedDataStream#window.timeBatch(5 min) select transactionLocation, sum(transactionAmount) as totalTransactionAmount, avg(transactionAmount) as avgTransactionAmount, transactionId group by transactionLocation insert into TranformedDataStream;