...
- First, you need to ensure that the information extracted for processing is based on the latest transactions. To do that, let's collect information relating to the latest transactions as follows.
You need a database table that at any given time, contains information about which transactions were last processed. Therefore, let's start by adding the definition of this data table to the Siddhi application as follows.
Panel -- MySQL Table to keep last processed transaction ID
@primaryKey('key')@store(type = 'rdbms', datasource = 'TransactionDataDBTriggerStateDB')
define table TriggerStateTable (key string, lastProcessedId long);
Here, you are creating the new
TriggerStateTable
table in the sameTransactionDataDB
which you created before starting this tutorial and stored data in the previous scenario. Thekey
attribute in the table schema is the primary key, and values for this attribute are the row IDs in the table.The
TriggerStateTable
table you created needs to be polled every five minutes for the last processed ID. To do this, you can compare the last processed ID in the table with the last processed ID in a stream in which a new event is triggered every five minutes. To make this possible, let's define a trigger that triggers an event in an inferred stream every five minutes.Panel define trigger TriggerStream at every 5 min;
-- In-memoryMySQL Table to keep last processed transaction ID
@primaryKey('key@store(type = 'rdbms', datasource = 'TriggerStateDB')
define table TriggerStateTable (key string, lastProcessedId long);
- To derive the latest transaction by comparing the last processed ID in the
TriggerStateTable
table and theTriggerStream
inferred stream, let's create a Siddhi query as follows:The
TriggerStateTable
has information about the last processed ID (i.e., the value of thelastProcessedID
attribute). To keep polling this table lt's join it with theTriggerStream
stream in which events are entered every 5 minutes. The join can be performed as follows.Panel from TriggerStream as s right outer join TriggerStateTable as t
The events extracted based on the last processed ID need to be inserted into another stream so that it can be used for further processing. Therefore, let's add the
insert into
clause with an inferred stream as follows.Panel from TriggerStream as s right outer join TriggerStateTable as t
insert into DataRetrievalStream;
Now you need to specify the condition based on which the last processed ID is selected from the joined
TriggerStream
stream and theTransactionTable
table.
As mentioned before, the purpose of theTriggerStream
stream is to poll theTransactionTable
table. Each new event in this stream is a request to poll the table, and once it is polled, the last processed ID derived is selected for further processing. If no value exists in the table for thelastProcessedID
attribute (e.g., if the table is new),0
must be added as the last processed ID by default. To do this, let's add a select clause with anIfThenElse
condition as shown below.Panel from TriggerStream as s right outer join TriggerStateTable as t
select ifThenElse(t.lastProcessedId is null, 0l, t.lastProcessedId )
as lastProcessedId
insert into DataRetrievalStream;
The partially completed Siddhi application looks as follows.
Tip Note that the query is named
CollectLastProcessedID
. It is recommended to name each query stating the purpose when there are multiple queries in the Siddhi application.Code Block language sql define trigger TriggerStream at every 5 min; -- In-memory Table to keep last processed transaction ID @primaryKey('key') define table TriggerStateTable (key string, lastProcessedId long); @info(name = 'CollectLastProcessedId') from TriggerStream as s right outer join TriggerStateTable as t select ifThenElse(t.lastProcessedId is null, 0l, t.lastProcessedId ) as lastProcessedId insert into DataRetrievalStream;
Excerpt hidden true The Design View at this stage looks as follows:
- To extract the data that needs to be processed, let's further update the Siddhi application as follows.
In the previous scenario, you stored the data to be processed in a table named
TransactionTable
. To access that data for this scenario, you need to include the same table definition in the Siddhi application that you are creating for this scenario as follows. languageCode Block Panel sql define
trigger
TriggerStream
at
every
5
min;
In-memory--
MySQL Table
to
keep
last
processed
transaction
@primaryKeyID
'key@store(
type = 'rdbms' , datasource = 'TriggerStateDB')
define
table
TriggerStateTable
(key
string,
lastProcessedId
long);
--
Store
table
@store(type
=
'rdbms'
,
datasource
=
TRANSACTION_DATA_SOURCE'
TransactionDataDB')
define
table
TransactionTable(transactionId
long,
userId
long,
transactionAmount
double,
transactionLocation
string);
@info(name
=
'CollectLastProcessedId')
from
TriggerStream
as
s
right
outer
join
TriggerStateTable
as
t
select
ifThenElse(t.lastProcessedId
is
null,
0l,
t.lastProcessedId
)
as
lastProcessedId
insert
into
DataRetrievalStream;
- The data to be extracted for processing needs to be older than the last processed ID. To extract information based on this condition, let's add a Siddhi query named
ExtractData
as follows:The last processed transaction ID should be taken from the
TransactionTable
only if the last processed ID in that table is later than the last processed ID of an event in theDataRetrievalStream
stream. Therefore, let's join this stream and table to do this comparison and extract the required events.Panel @info(name = 'ExtractData')
from DataRetrievalStream as s join TransactionTable as t
on s.lastProcessedId < t.transactionId
The data extracted needs to be directed to an output stream. Therefore, let's add an output stream named
ExtractedDataStream
that can be inferred.Panel from DataRetrievalStream as s join TransactionTable as t
on s.lastProcessedId < t.transactionId
insert into ExtractedDataStream;
Now let's select the information to be inserted into the
ExtractedDataStream
stream for the extracted events.Panel from DataRetrievalStream as s join TransactionTable as t
on s.lastProcessedId < t.transactionId
select t.transactionId, t.transactionAmount, t.transactionLocation
insert into ExtractedDataStream;
The partially completed Siddhi application now looks as follows.
Code Block language sql define trigger TriggerStream at every 5 min; -- In-memoryMySQL Table to keep last processed transaction ID @primaryKey('key@store(type = 'rdbms' , datasource = 'TriggerStateDB') define table TriggerStateTable (key string, lastProcessedId long); -- Store table @store(type = 'rdbms' , datasource = 'TRANSACTION_DATA_SOURCETransactionDataDB') define table TransactionTable(transactionId long, userId long, transactionAmount double, transactionLocation string); @info(name = 'CollectLastProcessedId') from TriggerStream as s right outer join TriggerStateTable as t select ifThenElse(t.lastProcessedId is null, 0l, t.lastProcessedId ) as lastProcessedId insert into DataRetrievalStream; @info(name = 'ExtractData') from DataRetrievalStream as s join TransactionTable as t on s.lastProcessedId < t.transactionId select t.transactionId, t.transactionAmount, t.transactionLocation insert into ExtractedDataStream;
Excerpt hidden true The Design View at this stage looks as follows:
When the above query is executed, you have the latest transactions extracted and directed to the
ExtractedDataStream
stream. To indicate that these transactions are already considered for processing and therefore, should not be selected for processing again, you need to update theTriggerStateTable
table. To do this, enter a query namedUpdateLastProcessedID
.The information with which the
TriggerStateTable
table is updated is taken from theExtractedDataStream
stream in which the latest transactions aree extracted every 5 minutes. To indicate this, add afrom
clause to theUpdateLastProcessedID
query as follows.Panel @info(
name
=
'UpdateLastProcessedId'
)
from
ExtractedDataStream
When selecting records to be updated/inserted to the table, the unique identity by which the records are identified should be the last processed ID. Therefore, let's add a select clause with the
lastProcessedID
attribute specified as the key.Panel @info(
name
=
'UpdateLastProcessedId'
)
from
ExtractedDataStream
select "lastProcessedId" as key, transactionId as lastProcessedId
The event with the last processed ID may already exist in the
TriggerStateTable
table. In such a scenario, the existing record should be updated with the latest details taken from the stream. If that event does not already exist in the table, it should be inserted as a new record. To indicate this, let's add anupdate or insert
clause as follows.Panel @info(
name
=
'UpdateLastProcessedId'
)
from
ExtractedDataStream
select "lastProcessedId" as key, transactionId as lastProcessedId
update or insert into TriggerStateTable
on TriggerStateTable.key == "lastProcessedId";
The partially created Siddhi application now looks as follows:
Code Block language sql define trigger TriggerStream at every 5 min; -- In-memoryMySQL Table to keep last processed transaction ID @primaryKey('key@store(type = 'rdbms' , datasource = 'TriggerStateDB') define table TriggerStateTable (key string, lastProcessedId long); -- Store table @store(type = 'rdbms' , datasource = 'TRANSACTION_DATA_SOURCETransactionDataDB') define table TransactionTable(transactionId long, userId long, transactionAmount double, transactionLocation string); @info(name = 'CollectLastProcessedId') from TriggerStream as s right outer join TriggerStateTable as t select ifThenElse(t.lastProcessedId is null, 0l, t.lastProcessedId ) as lastProcessedId insert into DataRetrievalStream; @info(name = 'ExtractData') from DataRetrievalStream as s join TransactionTable as t on s.lastProcessedId < t.transactionId select t.transactionId, t.transactionAmount, t.transactionLocation insert into ExtractedDataStream; @info(name='UpdateLastProcessedId') fromExtractedDataStream select "lastProcessedId" as key, transactionId as lastProcessedId update or insert into TriggerStateTable on TriggerStateTable.key == "lastProcessedId";
The enriched data now needs to be transformed via a stateful aggregation. You need to analyze sales the sales for each location by calculating the total sales and the average sales per location every five minutes. Let's create a Siddhi query named
StatefulAggregation
to carry out this analysis.The information to be analyzed is taken from the
ExtractedDataStream
stream, and the results are inserted into an output stream namedTransformedDataStream
. Therefore, let's add the from and insert into clauses accordingly.Panel @info(name = 'StatefulAggregation')
from ExtractedDataStream
insert into TransformedDataStream
As mentioned, the calculations need to be done every five minutes in a tumbling manner. Therefore, let's add a time batch window as follows.
Panel @info(name = 'StatefulAggregation')
from ExtractedDataStream#window.timeBatch(5 min)
insert into TransformedDataStream
The attributes in the
ExtractedDataStream
stream aretransactionID
,transactionLocation
andtransactionAmount
. The values for thetransactionLocation
attribute need to be taken without any further processing. However, you need to derive the total transaction amount and the average transaction amount from the values for thetransactionAmount
attribute. This can be achieved via thesum()
andavg()
Siddhi functions. Therefore, let's add the select clause as follows.Panel @info(name = 'StatefulAggregation')
from ExtractedDataStream#window.timeBatch(5 min)select transactionLocation,
sum(transactionAmount) as totalTransactionAmount,
avg(transactionAmount) as avgTransactionAmount,
transactionId
insert into TranformedDataStream;
The analysis is carried out to evaluate the sales by location. Therefore, let's add a
group by
clause to group the calculations by the location when presenting the results.Panel from ExtractedDataStream#window.timeBatch(5 min)
select transactionLocation,
sum(transactionAmount) as totalTransactionAmount,
avg(transactionAmount) as avgTransactionAmount,
transactionId
group by transactionLocation
insert into TranformedDataStream;
The partially created Siddhi application now looks as follows.
Code Block language sql define trigger TriggerStream at every 5 min; -- In-memory Table to keep last processed transaction ID @primaryKey('key') define table TriggerStateTable (key string, lastProcessedId long); -- Store table @store(type = 'rdbms' , datasource = 'TRANSACTION_DATA_SOURCE') define table TransactionTable(transactionId long, userId long, transactionAmount double, transactionLocation string); @info(name = 'CollectLastProcessedId') from TriggerStream as s right outer join TriggerStateTable as t select ifThenElse(t.lastProcessedId is null, 0l, t.lastProcessedId ) as lastProcessedId insert into DataRetrievalStream; @info(name = 'ExtractData') from DataRetrievalStream as s join TransactionTable as t on s.lastProcessedId < t.transactionId select t.transactionId, t.transactionAmount, t.transactionLocation insert into ExtractedDataStream; @info(name='UpdateLastProcessedId') from ExtractedDataStream select "lastProcessedId" as key, transactionId as lastProcessedId update or insert into TriggerStateTable on TriggerStateTable.key == "lastProcessedId"; from ExtractedDataStream#window.timeBatch(5 min) select transactionLocation, sum(transactionAmount) as totalTransactionAmount, avg(transactionAmount) as avgTransactionAmount, transactionId group by transactionLocation insert into TranformedDataStream;
The information processed via the
StatefulAggregation
query and directed to theTransformedDataStream
stream needs to be published via Kafka in the Binary format. To specify this, lets follow the steps below.The events published are taken from the
TransformedDataStream
stream. Therefore, the sink configuration needs to be connected to this stream. To do this, first, let's add a definition for theTransformedDataStream
stream which is currently an inferred stream.Panel define stream TranformedDataStream(transactionLocation string,
totalTransactionAmount double, avgTransactionAmount double,
transactionId long);
Now let's connect a sink definition where the transport type is Kafka and the map type is binary as shown below.
Panel -- Sink to load the data
@sink(type='kafka', bootstrap.servers='...',
topic='...',is.binary.message='...',
@map(type='xml'))
define stream TranformedDataStream(transactionLocation string,totalTransactionAmount double, avgTransactionAmount double,
transactionId long);
Before saving the Siddhi application, add a name and a description to the Siddhi application as follows.
Panel @App:name('DB-to-Kafka-ETL') @App:description('Extract data from database, perform stateful transformation, and load data to kafka')
Save the Siddhi application.. The completed version looks as follows.
Code Block language sql @App:name('DB-to-Kafka-ETL') @App:description('Extract data from database, perform stateful transformation, and load data to kafka') define trigger TriggerStream at every 5 min; -- Sink to load the data @sink(type='kafka', bootstrap.servers='...', topic='...',is.binary.message='...', @map(type='xml')) define stream TranformedDataStream(transactionLocation string, totalTransactionAmount double, avgTransactionAmount double, transactionId long); -- Store table @store(type = 'rdbms' , datasource = 'TransactionDataDB') define table TransactionTable(transactionId long, userId long, transactionAmount double, transactionLocation string); -- MySQL Table to keep last processed transaction ID @primaryKey('key') define table TriggerStateTable (key string, lastProcessedId long); @info(name = 'CollectLastProcessedId') from TriggerStream as s right outer join TriggerStateTable as t select ifThenElse(t.lastProcessedId is null, 0l, t.lastProcessedId ) as lastProcessedId insert into DataRetrievalStream; @info(name = 'ExtractData') from DataRetrievalStream as s join TransactionTable as t on s.lastProcessedId < t.transactionId select t.transactionId, t.transactionAmount, t.transactionLocation insert into ExtractedDataStream; @info(name='UpdateLastProcessedId') from ExtractedDataStream select "lastProcessedId" as key, transactionId as lastProcessedId update or insert into TriggerStateTable on TriggerStateTable.key == "lastProcessedId"; @info(name = 'StatefulAggregation') from ExtractedDataStream#window.timeBatch(5 min) select transactionLocation, sum(transactionAmount) as totalTransactionAmount, avg(transactionAmount) as avgTransactionAmount, transactionId group by transactionLocation insert into TranformedDataStream;