com.atlassian.confluence.content.render.xhtml.migration.exceptions.UnknownMacroMigrationException: The macro 'next_previous_link3' is unknown.

Capturing Changes from Data Stores

Introduction

In this tutorial, let’s understand how to capture changes from datastores as events and process them.

Let’s consider that there are two branches of the Sweet Factory named A and B. Sweet Bots insert a record into the database in the relevant branch after each batch of sweets is produced. The manager in the head office wants a name-wise total for all the categories of sweet produced in both the branches until now.

To understand the requirement, assume that the following quantities were reported from the two brances after the completion of a production run.

Branch A

Batch NoItemQuantity
A101Jaffa Cake100
A102Gingerbread102


Branch B

Batch NoItemQuantity
B101Jaffa Cake106
B102Gingerbread103

When the total production per batch for each sweet category is calculated, the manager in the head office expects to receive the following result.

ItemQuantity
Jaffa Cake206
Gingerbread205

Before you begin:

  • Try out the Integrating Datastores tutorial.
  • Set up the databases and the tables required for this tutorial as follows:

    It is assumed that you have already installed MySQL for the Integrating Datastores tutorial. If youy have skipped it, install MySQL as follows:

     Click here for MySQL Installation Instructions
    1. Download and install MySQL Server.

    2. Download the MySQL JDBC driver.

    3. Unzip the downloaded MySQL driver zipped archive, and copy the MySQL JDBC driver JAR (mysql-connector-java-x.x.xx-bin.jar) into the <SP_HOME>/lib directory.

    4. Enter the following command in a terminal/command window, where username is the username you want to use to access the databases.
      mysql -u username -p 
    5. When prompted, specify the password you are using to access the databases with the username you specified.


    1. Create two databases named productionA and productionB by issuing the following commands.

      CREATE DATABASE productionA;
      CREATE DATABASE productionB;
    2. In each database, create a table named SweetProduction by issuing the following commands.

      use productionA;
      CREATE TABLE SweetProduction (batchNo varchar (255) PRIMARY KEY, item varchar (255) NOT NULL, quantity int (255));
      use productionB;
      CREATE TABLE SweetProduction (batchNo varchar (255) PRIMARY KEY, item varchar (255) NOT NULL, quantity int (255));
  • Enable binary logging for MySQL.

  • Download and install the latest version of CDC Siddhi extension from the Siddhi Extensions page. For detailed instructions, see Downloading and Installing Siddhi Extensions.

To understand the CDC source type, see Siddhi-io-cdc.


Tutorial steps

Let's get started.

  1. Start the editor profile and sign in to the WSO2 Stream Processor Studio. Then open a new Siddhi file to write a new Siddhi application. You can name it as TotalSweetProductionApp.

    @App:name('TotalSweetProductionApp')

  2. Each captured event needs to include an item and a quantity. This information is reported for both the branches. Therefore, let's define an input stream for each branch as follows.

    @App:name('TotalSweetProductionApp')

    define stream InputStreamA(item string, quantity int);

    define stream InputStreamB(item string, quantity int);
  3. To receive the change events from the database table, you need an event source of the cdc type. Let’s add it above each stream definition as shown below. 

    @App:name('TotalSweetProductionApp')

    @source(type = 'cdc', url = 'jdbc:mysql://localhost:3306/productionA', username = 'alex', password = 'alex123', table.name = 'SweetProduction', operation = 'insert', @map(type = 'keyvalue'))
    define stream InputStreamA(item string, quantity int);

    @source(type = 'cdc', url = 'jdbc:mysql://localhost:3406/productionB', username = 'smith', password = 'smith123', table.name = 'SweetProduction', operation = 'insert', @map(type = 'keyvalue'))
    define stream InputStreamB(item string, quantity int);

    Here, branch A receives change events from the production A database, and branch B receives them from the production B database.

  4. The events from both input streams need to be available in one stream before the output can be processed. Therefore, let's define another stream for this purpose as follows.

    define stream SweetProductionStream (item string, quantity int);
  5. The expected output is the total production per sweet category. Therefore, let's define an output stream with the same attributes as the input streams.

    define stream SweetTotalStream(name string, totalProduction long);

  6. The output needs to be presented as logs in the console. Therefore, let's add a sink of the log type.

    @sink(type='log', prefix='Sweet Totals:')

    define stream SweetTotalStream(name string, totalProduction long);

  7. Let’s add a query as follows to take events from input stream A and send it to the SweetProductionStream stream.

    @info(name='SweetProductionQueryA')
    from InputStreamA
    select item, quantity
    insert into SweetProductionStream;

  8. To take events from input stream B and send it to the SweetProductionStream stream, let's add another query as follows.

    @info(name='SweetProductionQueryB')
    from InputStreamB
    select item, quantity
    insert into SweetProductionStream;

  9. Finally, let's add a query to process the events in the SweetProductionStream stream and then insert the the item and total quantity into the output stream.

    @info(name='SweetTotalQuery')
    from SweetProductionStream
    select item, sum(quantity) as totalProduction
    group by item
    insert into SweetTotalStream;

    Here, the sum() Siddhi function is applied to the quantity attribute of the SweetProductionStream stream to derive the total production reported by all the events from both the branches.

The Siddhi application is now ready to be saved. It looks as follows.

@App:name('TotalSweetProductionApp')

@source(type = 'cdc', url = 'jdbc:mysql://localhost:3306/productionA', username = 'alex', password = 'alex123', table.name = 'SweetProduction', operation = 'insert', @map(type = 'keyvalue'))
define stream InputStreamA(item string, quantity int);

@source(type = 'cdc', url = 'jdbc:mysql://localhost:3406/productionB', username = 'smith', password = 'smith123', table.name = 'SweetProduction', operation = 'insert', @map(type = 'keyvalue'))
define stream InputStreamB(item string, quantity int);

define stream SweetProductionStream (item string, quantity int);

@sink(type='log', prefix='Sweet Totals:')
define stream SweetTotalStream(item string, totalProduction long);

@info(name='SweetProductionQueryA')
from InputStreamA
select item, quantity
insert into SweetProductionStream;

@info(name='SweetProductionQueryB')
from InputStreamB
select item, quantity
insert into SweetProductionStream;

@info(name='SweetTotalQuery')
from SweetProductionStream
select item, sum(quantity) as totalProduction
group by item
insert into SweetTotalStream;

Generating the output

To generate an output from the TotalSweetProductionApp Siddhi application, follow the procedure below:

  1. Insert two records from branch A as follows:

    Use productionA;
    INSERT INTO SweetProduction (A101, Jaffa Cake, 100);
    INSERT INTO SweetProduction (A102, Gingerbread, 102);


    The following logs are displayed in the output console.

  2. Insert another two records from branch B as follows:

    Use productionB;
    INSERT INTO SweetProduction (B101, Jaffa Cake, 106)
    INSERT INTO SweetProduction (B102, Gingerbread, 103);


    The following is logged in the output console.

com.atlassian.confluence.content.render.xhtml.migration.exceptions.UnknownMacroMigrationException: The macro 'next_previous_links2' is unknown.