Capturing Changes from Data Stores
Introduction
In this tutorial, let’s understand how to capture changes from datastores as events and process them.
Let’s consider that there are two branches of the Sweet Factory named A and B. Sweet Bots insert a record into the database in the relevant branch after each batch of sweets is produced. The manager in the head office wants a name-wise total for all the categories of sweet produced in both the branches until now.
To understand the requirement, assume that the following quantities were reported from the two brances after the completion of a production run.
Branch A
Batch No | Item | Quantity |
---|---|---|
A101 | Jaffa Cake | 100 |
A102 | Gingerbread | 102 |
Branch B
Batch No | Item | Quantity |
---|---|---|
B101 | Jaffa Cake | 106 |
B102 | Gingerbread | 103 |
When the total production per batch for each sweet category is calculated, the manager in the head office expects to receive the following result.
Item | Quantity |
---|---|
Jaffa Cake | 206 |
Gingerbread | 205 |
Before you begin:
- Try out the Integrating Datastores tutorial.
Set up the databases and the tables required for this tutorial as follows:
It is assumed that you have already installed MySQL for the Integrating Datastores tutorial. If youy have skipped it, install MySQL as follows:
Create two databases named
productionA
andproductionB
by issuing the following commands.CREATE DATABASE productionA;
CREATE DATABASE productionB;
In each database, create a table named
SweetProduction
by issuing the following commands.use productionA; CREATE TABLE SweetProduction (batchNo varchar (255) PRIMARY KEY, item varchar (255) NOT NULL, quantity int (255));
use productionB; CREATE TABLE SweetProduction (batchNo varchar (255) PRIMARY KEY, item varchar (255) NOT NULL, quantity int (255));
Enable binary logging for MySQL.
- Download and install the latest version of CDC Siddhi extension from the Siddhi Extensions page. For detailed instructions, see Downloading and Installing Siddhi Extensions.
To understand the CDC source type, see Siddhi-io-cdc.
Tutorial steps
Let's get started.
Start the editor profile and sign in to the WSO2 Stream Processor Studio. Then open a new Siddhi file to write a new Siddhi application. You can name it as
TotalSweetProductionApp
.@App:name('TotalSweetProductionApp')
Each captured event needs to include an item and a quantity. This information is reported for both the branches. Therefore, let's define an input stream for each branch as follows.
@App:name('TotalSweetProductionApp')
To receive the change events from the database table, you need an event source of the
cdc
type. Let’s add it above each stream definition as shown below.Â@App:name('TotalSweetProductionApp')
define stream InputStreamA(item string, quantity int);
@source(type = 'cdc', url = 'jdbc:mysql://localhost:3406/productionB', username = 'smith', password = 'smith123', table.name = 'SweetProduction', operation = 'insert', @map(type = 'keyvalue'))define stream InputStreamB(item string, quantity int);
Here, branch A receives change events from the
production A
database, and branch B receives them from theÂproduction B
 database.The events from both input streams need to be available in one stream before the output can be processed. Therefore, let's define another stream for this purpose as follows.
define stream SweetProductionStream (item string, quantity int);
The expected output is the total production per sweet category. Therefore, let's define an output stream with the same attributes as the input streams.
define stream SweetTotalStream(name string, totalProduction long);
The output needs to be presented as logs in the console. Therefore, let's add a sink of the
log
type.@sink(type='log', prefix='Sweet Totals:')
define stream SweetTotalStream(name string, totalProduction long);
Let’s add a query as follows to take events from input stream A and send it to the
SweetProductionStream
stream.@info(name='SweetProductionQueryA')
from InputStreamA
select item, quantity
insert into SweetProductionStream;
To take events from input stream B and send it to theÂ
SweetProductionStream
 stream, let's add another query as follows.@info(name='SweetProductionQueryB')
from InputStreamB
select item, quantity
insert into SweetProductionStream;
Finally, let's add a query to process the events in the
SweetProductionStream
stream and then insert the the item and total quantity into the output stream.@info(name='SweetTotalQuery')
from SweetProductionStream
select item, sum(quantity) as totalProduction
group by item
insert into SweetTotalStream;
Here, the
sum()
Siddhi function is applied to the quantity attribute of theSweetProductionStream
stream to derive the total production reported by all the events from both the branches.
The Siddhi application is now ready to be saved. It looks as follows.
@App:name('TotalSweetProductionApp') @source(type = 'cdc', url = 'jdbc:mysql://localhost:3306/productionA', username = 'alex', password = 'alex123', table.name = 'SweetProduction', operation = 'insert', @map(type = 'keyvalue')) define stream InputStreamA(item string, quantity int); @source(type = 'cdc', url = 'jdbc:mysql://localhost:3406/productionB', username = 'smith', password = 'smith123', table.name = 'SweetProduction', operation = 'insert', @map(type = 'keyvalue')) define stream InputStreamB(item string, quantity int); define stream SweetProductionStream (item string, quantity int); @sink(type='log', prefix='Sweet Totals:') define stream SweetTotalStream(item string, totalProduction long); @info(name='SweetProductionQueryA') from InputStreamA select item, quantity insert into SweetProductionStream; @info(name='SweetProductionQueryB') from InputStreamB select item, quantity insert into SweetProductionStream; @info(name='SweetTotalQuery') from SweetProductionStream select item, sum(quantity) as totalProduction group by item insert into SweetTotalStream;
Generating the output
To generate an output from the TotalSweetProductionApp
Siddhi application, follow the procedure below:
Insert two records from branch A as follows:
Use productionA; INSERT INTO SweetProduction (A101, Jaffa Cake, 100);
INSERT INTO SweetProduction (A102, Gingerbread, 102);
The following logs are displayed in the output console.
Insert another two records from branch B as follows:
Use productionB; INSERT INTO SweetProduction (B101, Jaffa Cake, 106)
INSERT INTO SweetProduction (B102, Gingerbread, 103);
The following is logged in the output console.