Table of Contents | ||||
---|---|---|---|---|
|
...
Tip | ||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|
| ||||||||||||
For more information on integrating data stores, please see the page on Storage Integration.
|
...
- Start the editor, and login to the WSO2 Stream Processor Studio. Then open a new Siddhi file to write a new Siddhi application. You can name it as
ShipmentHistoryApp
. The information captured from the event sent by the supplier must include the name of the raw material, the name of the supplier and the amount of material purchased. Let's add an input stream definition as follows to capture this information.
Panel define stream RawMaterialStream(name string, supplier string, amount double);
The incoming event from the supplier is in JSON format. Therefore, to convert it to the Siddhi format so that it can be processed by WSO2 Stream Processor, let's add a source configuration that includes a JSON mapping as follows.
Panel @source(type = 'http', @map(type = 'json')) define stream RawMaterialStream(name string, supplier string, amount double);
Let's define an output stream as follows.
Panel define stream ShipmentDetailsStream(name string, supplier string, amount double);
To forward events from the input steam to the output stream, let's add a simple query as follows.
Panel from RawMaterialStream select name, amount insert into ShipmentDetailsStream;
Now the Siddhi application looks as follows.
Code Block language sql @App:name('ShipmentHistoryApp') @source(type = 'http', @map(type = 'json')) define stream RawMaterialStream(name string, supplier string, amount double); define stream ShipmentDetailsStream(name string, supplier string, amount double); from RawMaterialStream select name, amount insert into ShipmentDetailsStream;
Info The Stream Processor Studio indicates a syntax error at this stage because the
ShipmentDetailsStream
has an additional attribute other than the attributes included in the select clause. Ignore this error because the Siddhi application is still in an incomplete state. This error is corrected in the next steps.The
ShipmentHistoryApp
in its current state can only forward all the events in theRawMaterialStream
to theShipmentDetailsStream
. However, you need to save the incoming events in a store instead of passing them to a stream. Therefore, let's convert the output stream definition to a store definition by changing thedefine stream
syntax todefine table
.Panel define table ShipmentDetails(name string, supplier string, amount double);
When the events are directed to a data store, they are not persisted, but they are stored within an in-memory table for later retrieval. The information stored in-memory is no longer available once the server is restarted.
Compared to a stream, a table supports a set of additional annotations that enables it to leverage on additional functionalities offered by data stores, such as unique keys and indexes.
A unique key (also referred to as primary key) is useful for uniquely identifying records. Fields set as unique keys are allowed to have duplicate values within the data store. Let's set a unique key using the
@PrimaryKey
annotation, as shown below.Panel @primaryKey('name')
define table ShipmentDetails(name string, supplier string, amount double);
The purchase records stored in the table need to be indexed by the supplier. To do this, you need to specify that the
supplier
attribute is an index attribute as shown below.The
@index
annotation is used for specifying secondary indexes. This is useful for data retrieval scenarios. If the underlying data storage mechanism (e.g., RDBMS) supports secondary indexes, the fields specified here are indexed in the data store-level itself.Panel @primaryKey('name')
@index('supplier')
define table ShipmentDetails(name string, supplier string, amount double);
Let's specify a store of the RDBMS type to the
ShipmentDetails
table to bind RDBMS data storage mechanisms to it.
To do this, you need to use the@store
annotation. Similar to the@sink
and the@source
annotations that you have already used in these tutorials, the properties used within the annotation vary depending on the type of the store.Panel @primaryKey('name')
@index('supplier')
@store(type='rdbms')
define table ShipmentDetails(name string, supplier string, amount double);
The RDBMS store has a set of properties that are required to be set in order to create the connection with the underlying DB instance. These include the JDBC URL of the database, the username, password and such. Let's add values to these properties so that the store definition becomes syntactically complete.
Panel @primaryKey('name')
@index('supplier')
@store(type='rdbms', jdbc.url="jdbc: mysql://localhost:3306/SweetFactoryDB ", username="root", password="root" , jdbc.driver.name ="com.mysql.jdbc.Driver")
define table ShipmentDetails(name string, supplier string, amount double);Now the updated Siddhi application is as follows:
Code Block language sql @App:name('ShipmentHistoryApp') @source(type = 'http', @map(type = 'json')) define stream RawMaterialStream(name string, supplier string, amount double); @primaryKey('name') @index('supplier') @store(type='rdbms', jdbc.url="jdbc:mysql://localhost:3306/SweetFactoryDB", username="root", password="root" , jdbc.driver.name="com.mysql.jdbc.Driver") define table ShipmentDetails(name string, supplier string, amount double); from RawMaterialStream select name, supplier, amount insert into ShipmentDetails;
The factory manager needs to see the latest shipments for each raw material. If a record for a particular raw material does not already exist in the database, it must be added as a new entry. If it already exists, its previous must be overwritten by the new incoming record.
This can be considered as an update or insert scenario. In Siddhi stores, the update or insert into directive can be used for this purpose in place of the normal insert into command. In order to carry out this operation, let's update the query as follows:Panel from RawMaterialStream select name, supplier, amount update or insert into ShipmentDetails;
In the above query, the criteria based on which Siddhi can identify whether a record already exists in the database is not specified. In this scenario, we can specify that a record needs to be updated when the
name
attribute (which is the primary key of the table) matches any pre-existing record. This can be done by adding a condition to the query as shown below.Panel from RawMaterialStream
select name, supplier, amount
insert into ShipmentDetails
on ShipmentDetailTables.name == name;
The completed query is as follows.
Code Block language sql @App:name('ShipmentHistoryApp') @source(type = 'http', @map(type = 'json')) define stream RawMaterialStream(name string, supplier string, amount double); @primaryKey('name') @index('supplier') @store(type='rdbms', jdbc.url="jdbc:mysql://localhost:3306/SweetFactoryDB", username="root", password="root" , jdbc.driver.name="com.mysql.jdbc.Driver") define table ShipmentDetails(name string, supplier string, amount double); from RawMaterialStream select name, supplier, amount update or insert into ShipmentDetails on ShipmentDetails.name == name;
To store some data in the
ShipmentDetails
table, issue the following cURL commands:Code Block curl -X POST \ http://0.0.0.0:8280/ShipmentHistoryApp/RawMaterialStream \ -H 'content-type: application/json' \ -d '{ "event": { "name": "Flour", "supplier": "Acme", "amount": 460.0 } }'
Code Block curl -X POST \ http://0.0.0.0:8280/ShipmentHistoryApp/RawMaterialStream \ -H 'content-type: application/json' \ -d '{ "event": { "name": "Sugar", "supplier": "Indigo6", "amount": 272.0 } }'
Code Block curl -X POST \ http://0.0.0.0:8280/ShipmentHistoryApp/RawMaterialStream \ -H 'content-type: application/json' \ -d '{ "event": { "name": "Honey", "supplier": "The BeeGees", "amount": 9.0 } }'
Code Block curl -X POST \ http://0.0.0.0:8280/ShipmentHistoryApp/RawMaterialStream \ -H 'content-type: application/json' \ -d '{ "event": { "name": "Food Coloring", "supplier": "Wadjet Food Products", "amount": 30.0 } }'
Code Block curl -X POST \ http://0.0.0.0:8280/ShipmentHistoryApp/RawMaterialStream \ -H 'content-type: application/json' \ -d '{ "event": { "name": "Chocolate Chip", "supplier": "Larkspur Landing", "amount": 34.0 } }'
Once the events are sent, the event table is updated to be similar to the example shown below.
Name Supplier Amount Flour Acme 460 Sugar Indigo6 272 Honey The BeeGees
9 Food Coloring Wadjet Food Products 30 Chocolate Chip Larkspur Landing 34
The details in the store can be retrieved through the Store API as well as by deploying the Siddhi application in a worker node. To get the above details, issue the following cURL command.Info In order to issue the following command, the server must be started in the worker node.
Code Block curl -X POST \ http://localhost:90907370/stores/query \ -u admin:admin \ -H 'content-type: application/json' \ -d '{ "appName" : "ShipmentHistoryApp", "query" : "from ShipmentDetails select *" }'
The body of this request contains two parameters, which are
appName
andquery
.appName
: This must be same as the name of the Siddhi application deployed In this scenario, it isShipmentHistoryApp
.query
: Here, a validselect
Siddhi query that includes the table name must be specified. In this scenario, the following query which includes theShipmentDetails
table created in this scenario is specified.from ShipmentDetails select *
This retrieves all the data in the
ShipmentDetails
store. Further, Siddhi filters such ashaving
can be applied in order to filter data.