Integrating Datastores
Introduction
In the previous tutorials, you worked with events arriving in real time. Those tutorials only analyzed live events received over a specific period of time.
In this tutorial, let's understand how live data can work in conjunction with historic data as often required in real world scenarios.
Let's consider the scenario for purchasing raw materials for the Sweet Factory. Each time a consignment of raw materials is delivered, the supplier sends an event in JSON format to the WSO2 SP instance of the Sweet Factory. The manager wants the latest event from the supplier recorded for report generation.
This tutorial covers the following topics:
Storing real-time data for later processing
Correlating real-time data with historic data
Manipulating static data based on real-time data
Before you begin:
The user scenarios covered in this tutorial are supported by a set of extensions implemented for WSO2 SP called stores.
A store can be defined as any structure that acts as a data store for both data definition and data manipulation. Supported store implementations in Stream Processor 4.0 include the following:
For more information on integrating data stores, please see the page on Storage Integration.
In this scenario, the events generated when the supplier delivered raw materials are stored in a MySQL table named
SweetFactoryDB. You need to download and install MySQL and create this database before you try the tutorial steps.
Tutorial steps
Let's get started!
Start the editor, and login to the WSO2 Stream Processor Studio. Then open a new Siddhi file to write a new Siddhi application. You can name it as
ShipmentHistoryApp.The information captured from the event sent by the supplier must include the name of the raw material, the name of the supplier and the amount of material purchased. Let's add an input stream definition as follows to capture this information.
The incoming event from the supplier is in JSON format. Therefore, to convert it to the Siddhi format so that it can be processed by WSO2 Stream Processor, let's add a source configuration that includes a JSON mapping as follows.
Let's define an output stream as follows.
To forward events from the input steam to the output stream, let's add a simple query as follows.
Now the Siddhi application looks as follows.
@App:name('ShipmentHistoryApp') @source(type = 'http', @map(type = 'json')) define stream RawMaterialStream(name string, supplier string, amount double); define stream ShipmentDetailsStream(name string, supplier string, amount double); from RawMaterialStream select name, amount insert into ShipmentDetailsStream;The
ShipmentHistoryAppin its current state can only forward all the events in theRawMaterialStreamto theShipmentDetailsStream. However, you need to save the incoming events in a store instead of passing them to a stream. Therefore, let's convert the output stream definition to a store definition by changing thedefine streamsyntax todefine table.When the events are directed to a data store, they are not persisted, but they are stored within an in-memory table for later retrieval. The information stored in-memory is no longer available once the server is restarted.
Compared to a stream, a table supports a set of additional annotations that enables it to leverage on additional functionalities offered by data stores, such as unique keys and indexes.
A unique key (also referred to as primary key) is useful for uniquely identifying records. Fields set as unique keys are allowed to have duplicate values within the data store. Let's set a unique key using the
@PrimaryKeyannotation, as shown below.The purchase records stored in the table need to be indexed by the supplier. To do this, you need to specify that the
supplierattribute is an index attribute as shown below.The
@indexannotation is used for specifying secondary indexes. This is useful for data retrieval scenarios. If the underlying data storage mechanism (e.g., RDBMS) supports secondary indexes, the fields specified here are indexed in the data store-level itself.Let's specify a store of the RDBMS type to the
ShipmentDetailstable to bind RDBMS data storage mechanisms to it.
To do this, you need to use the@storeannotation. Similar to the@sinkand the@sourceannotations that you have already used in these tutorials, the properties used within the annotation vary depending on the type of the store.The RDBMS store has a set of properties that are required to be set in order to create the connection with the underlying DB instance. These include the JDBC URL of the database, the username, password and such. Let's add values to these properties so that the store definition becomes syntactically complete.
Now the updated Siddhi application is as follows:
@App:name('ShipmentHistoryApp') @source(type = 'http', @map(type = 'json')) define stream RawMaterialStream(name string, supplier string, amount double); @primaryKey('name') @index('supplier') @store(type='rdbms', jdbc.url="jdbc:mysql://localhost:3306/SweetFactoryDB", username="root", password="root" , jdbc.driver.name="com.mysql.jdbc.Driver") define table ShipmentDetails(name string, supplier string, amount double); from RawMaterialStream select name, supplier, amount insert into ShipmentDetails;The factory manager needs to see the latest shipments for each raw material. If a record for a particular raw material does not already exist in the database, it must be added as a new entry. If it already exists, its previous must be overwritten by the new incoming record.
This can be considered as an update or insert scenario. In Siddhi stores, the update or insert into directive can be used for this purpose in place of the normal insert into command. In order to carry out this operation, let's update the query as follows:In the above query, the criteria based on which Siddhi can identify whether a record already exists in the database is not specified. In this scenario, we can specify that a record needs to be updated when the
nameattribute (which is the primary key of the table) matches any pre-existing record. This can be done by adding a condition to the query as shown below.The completed query is as follows.
@App:name('ShipmentHistoryApp') @source(type = 'http', @map(type = 'json')) define stream RawMaterialStream(name string, supplier string, amount double); @primaryKey('name') @index('supplier') @store(type='rdbms', jdbc.url="jdbc:mysql://localhost:3306/SweetFactoryDB", username="root", password="root" , jdbc.driver.name="com.mysql.jdbc.Driver") define table ShipmentDetails(name string, supplier string, amount double); from RawMaterialStream select name, supplier, amount update or insert into ShipmentDetails on ShipmentDetails.name == name;To store some data in the
ShipmentDetailstable, issue the following cURL commands:curl -X POST \ http://0.0.0.0:8280/ShipmentHistoryApp/RawMaterialStream \ -H 'content-type: application/json' \ -d '{ "event": { "name": "Flour", "supplier": "Acme", "amount": 460.0 } }'curl -X POST \ http://0.0.0.0:8280/ShipmentHistoryApp/RawMaterialStream \ -H 'content-type: application/json' \ -d '{ "event": { "name": "Sugar", "supplier": "Indigo6", "amount": 272.0 } }'curl -X POST \ http://0.0.0.0:8280/ShipmentHistoryApp/RawMaterialStream \ -H 'content-type: application/json' \ -d '{ "event": { "name": "Honey", "supplier": "The BeeGees", "amount": 9.0 } }'curl -X POST \ http://0.0.0.0:8280/ShipmentHistoryApp/RawMaterialStream \ -H 'content-type: application/json' \ -d '{ "event": { "name": "Food Coloring", "supplier": "Wadjet Food Products", "amount": 30.0 } }'curl -X POST \ http://0.0.0.0:8280/ShipmentHistoryApp/RawMaterialStream \ -H 'content-type: application/json' \ -d '{ "event": { "name": "Chocolate Chip", "supplier": "Larkspur Landing", "amount": 34.0 } }'Once the events are sent, the event table is updated to be similar to the example shown below.
The details in the store can be retrieved through the Store API as well as by deploying the Siddhi application in a worker node. To get the above details, issue the following cURL command.curl -X POST \ http://localhost:9090/stores/query \ -u admin:admin \ -H 'content-type: application/json' \ -d '{ "appName" : "ShipmentHistoryApp", "query" : "from ShipmentDetails select *" }'The body of this request contains two parameters, which are
appNameandquery.appName: This must be same as the name of the Siddhi application deployed In this scenario, it isShipmentHistoryApp.query: Here, a validselectSiddhi query that includes the table name must be specified. In this scenario, the following query which includes theShipmentDetailstable created in this scenario is specified.from ShipmentDetails select *This retrieves all the data in the
ShipmentDetailsstore. Further, Siddhi filters such ashavingcan be applied in order to filter data.