Capturing Changes from Data Stores
com.atlassian.confluence.content.render.xhtml.migration.exceptions.UnknownMacroMigrationException: The macro 'next_previous_link3' is unknown.

Capturing Changes from Data Stores

Introduction

In this tutorial, let’s understand how to capture changes from datastores as events and process them.

Let’s consider that there are two branches of the Sweet Factory named A and B. Sweet Bots insert a record into the database in the relevant branch after each batch of sweets is produced. The manager in the head office wants a name-wise total for all the categories of sweet produced in both the branches until now.

To understand the requirement, assume that the following quantities were reported from the two brances after the completion of a production run.

Branch A

Batch No

Item

Quantity

Batch No

Item

Quantity

A101

Jaffa Cake

100

A102

Gingerbread

102


Branch B

Batch No

Item

Quantity

Batch No

Item

Quantity

B101

Jaffa Cake

106

B102

Gingerbread

103

When the total production per batch for each sweet category is calculated, the manager in the head office expects to receive the following result.

Item

Quantity

Item

Quantity

Jaffa Cake

206

Gingerbread

205

Before you begin:

  • Try out the Integrating Datastores tutorial.

  • Set up the databases and the tables required for this tutorial as follows:

     

    1. Create two databases named productionA and productionB by issuing the following commands.

      CREATE DATABASE productionA;
      CREATE DATABASE productionB;
    2. In each database, create a table named SweetProduction by issuing the following commands.

      use productionA; CREATE TABLE SweetProduction (batchNo varchar (255) PRIMARY KEY, item varchar (255) NOT NULL, quantity int (255));
      use productionB; CREATE TABLE SweetProduction (batchNo varchar (255) PRIMARY KEY, item varchar (255) NOT NULL, quantity int (255));
  • Enable binary logging for MySQL.

  • Download and install the latest version of CDC Siddhi extension from the Siddhi Extensions page. For detailed instructions, see Downloading and Installing Siddhi Extensions.

To understand the CDC source type, see Siddhi-io-cdc.


Tutorial steps

Let's get started.

  1. Start the editor profile and sign in to the WSO2 Stream Processor Studio. Then open a new Siddhi file to write a new Siddhi application. You can name it as TotalSweetProductionApp.

  2. Each captured event needs to include an item and a quantity. This information is reported for both the branches. Therefore, let's define an input stream for each branch as follows.

  3. To receive the change events from the database table, you need an event source of the cdc type. Let’s add it above each stream definition as shown below. 

    Here, branch A receives change events from the production A database, and branch B receives them from the production B database.

  4. The events from both input streams need to be available in one stream before the output can be processed. Therefore, let's define another stream for this purpose as follows.

  5. The expected output is the total production per sweet category. Therefore, let's define an output stream with the same attributes as the input streams.

  6. The output needs to be presented as logs in the console. Therefore, let's add a sink of the log type.

  7. Let’s add a query as follows to take events from input stream A and send it to the SweetProductionStream stream.

  8. To take events from input stream B and send it to the SweetProductionStream stream, let's add another query as follows.

  9. Finally, let's add a query to process the events in the SweetProductionStream stream and then insert the the item and total quantity into the output stream.

    Here, the sum() Siddhi function is applied to the quantity attribute of the SweetProductionStream stream to derive the total production reported by all the events from both the branches.

The Siddhi application is now ready to be saved. It looks as follows.

@App:name('TotalSweetProductionApp') @source(type = 'cdc', url = 'jdbc:mysql://localhost:3306/productionA', username = 'alex', password = 'alex123', table.name = 'SweetProduction', operation = 'insert', @map(type = 'keyvalue')) define stream InputStreamA(item string, quantity int); @source(type = 'cdc', url = 'jdbc:mysql://localhost:3406/productionB', username = 'smith', password = 'smith123', table.name = 'SweetProduction', operation = 'insert', @map(type = 'keyvalue')) define stream InputStreamB(item string, quantity int); define stream SweetProductionStream (item string, quantity int); @sink(type='log', prefix='Sweet Totals:') define stream SweetTotalStream(item string, totalProduction long); @info(name='SweetProductionQueryA') from InputStreamA select item, quantity insert into SweetProductionStream; @info(name='SweetProductionQueryB') from InputStreamB select item, quantity insert into SweetProductionStream; @info(name='SweetTotalQuery') from SweetProductionStream select item, sum(quantity) as totalProduction group by item insert into SweetTotalStream;

Generating the output

To generate an output from the TotalSweetProductionApp Siddhi application, follow the procedure below:

  1. Insert two records from branch A as follows:

    Use productionA; INSERT INTO SweetProduction (A101, Jaffa Cake, 100);
    INSERT INTO SweetProduction (A102, Gingerbread, 102);


    The following logs are displayed in the output console.

  2. Insert another two records from branch B as follows:

    Use productionB; INSERT INTO SweetProduction (B101, Jaffa Cake, 106)
    INSERT INTO SweetProduction (B102, Gingerbread, 103);


    The following is logged in the output console.

com.atlassian.confluence.content.render.xhtml.migration.exceptions.UnknownMacroMigrationException: The macro 'next_previous_links2' is unknown.