Consuming Events
Introduction
In the previous tutorial, you covered how to create a simple Siddhi application and how to simulate virtual events to it.
In this tutorial, let's look at how events are received in a real world scenario, and how to use the event capturing framework of WSO2 SP to receive incoming events even when their format is different to the format specified within your Siddhi application.
Let's consider a scenario where the events generated by the Sweet Bots are in the JSON format, and they are sent via HTTP calls. The manager requires them to be received at a specific endpoint that has a specific URL.
Before you begin:
- This tutorial uses the same Siddhi application that is created and deployed in Tutorial 1: Creating a Simple Siddhi Application. Therefore, it is recommended that you try Tutorial 1 first.
- Sources are the type of extensions that are used in the WSO2 Stream Processor to indicate to the runtime that events would be arriving from the defined endpoint or entity. WSO2 Stream Processor supports many different Source types out of the box, which include HTTP, TCP, Kafka etc. For more information about the sources supported for WSO2 SP, see Collecting Events.
Tutorial steps
Let's get started!
To understand the difference between receiving events with default mapping and receiving them with custom mapping, this section is divided into the the following parts.
Configure Siddhi application to receive HTTP events
In this section, let's see how to add the required configurations to receive events via HTTP in JSON format.
Let's open theÂ
SweetTotalsApp
 Siddhi application that you created in the previous tutorial. It currently looks as follows.@App:name('SweetTotalApp') define stream SweetProductionStream (name string, amount long); @sink(type='log', prefix='Sweet Totals:') define stream SweetTotalStream(name string, totalProduction long); @info(name='SweetTotalQuery') from SweetProductionStream select name, sum(amount) as totalProduction group by name insert into SweetTotalStream;
To receive the events generated by the Sweet Bots via HTTP calls, you need an event source of theÂ
http
 type. Let's add it as shown below.@source(type='http')
The HTTP events need to be received at a specific endpoint. Let's add the URL of this endpoint to the HTTP source configuration as follows:
@source(type='http', receiver.url='http://localhost:5005/SweetProductionEP')
The incoming messages generated by Sweet Bots are in the JSON format. You require a mapping configuration to transform them into a format that can be processed by WSO2 SP. Therefore, let's add an annotation for JSON mapping to the HTTP source as shown below.
For the complete list of mapping types supported for WSO2 SP, see Collecting Events - Event Format.
@source(type='http', receiver.url='http://localhost:5005/SweetProductionEP', @map(type = 'json'))
Now theÂ
SweetTotalsApp
Siddhi application looks as follows.@App:name('SweetTotalApp') @source(type='http', receiver.url='http://localhost:5005/SweetProductionEP', @map(type = 'json')) define stream SweetProductionStream (name string, amount long); @sink(type='log', prefix='Sweet Totals:') define stream SweetTotalStream(name string, totalProduction long); @info(name='SweetTotalQuery') from SweetProductionStream select name, sum(amount) as totalProduction group by name insert into SweetTotalStream;
- Save theÂ
SweetTotalsApp
 Siddhi application. Then click the following icon to start it so that it can process the events you send.
 To see whether the Siddhi application functions as expected, let's send two events with the following information.
The input events generated by the Sweet Bots are in the JSON format. Therefore, the format is similar to the sample shown below.
{ "event": { "name": "Jaffa Cake", "amount": 10 } }
In order to send the two input events to WSO2 SP in the format shown above, issue the following two cURL commands.
curl -X POST \ http://localhost:5005/SweetProductionEP \ -H 'content-type: application/json' \ -d '{ "event": { "name": "Jaffa Cake", "amount": 10 } }'
curl -X POST \ http://localhost:5005/SweetProductionEP \ -H 'content-type: application/json' \ -d '{ "event": { "name": "Jaffa Cake", "amount": 15 } }'
The two events sent contains the following information.
Input Event No Name Amount 1 Jaffa Cake
10
2 Jaffa Cake
15 As a result, two output events are expected to be generated with the following information.
Output Event No Name Amount 1 Jaffa Cake
10
2 Jaffa Cake
25 This generates the following output log in the CLI.
Add custom mapping
The previous step assumes that the message body of the events generated by Sweet Bots exactly match the schema of the SweetProductionStream
 input stream that is defined in your Siddhi application. Now let's consider a scenario where the attributes of the incoming JSON events are different to that of the input stream. Their format is as given in the sample below:
{ "sweet": "Jaffa Cake", "batch": { "batch id": "batch1", "count": 10 } }
From this event, you only need to extract the values for sweet
and count
(which is nested under batch
) attributes. Therefore, let's update the @map
annotation as follows:
- To allow the Siddhi application to identify the elements that need to be extracted from the JSON message, add anÂ
@attributes
 annotation to the mapping configuration as follows.@map(type = 'json', @attributes(...))
The JSON mapper supportsÂ
JsonPath
. Therefore, the values for the attributes annotation can beÂJsonPath
 expressions. TheÂJsonPath
 expressions for the name of the sweet and its total production are as follows.Stream Attribute Name JSON Event Attribute Name JsonPath
Expressionname
sweet
$.sweet
amount
count
$.batch.count
Based on the aboveÂ
JsonPath
 expressions you identified, you can update theÂ@attributes
 annotation as follows.@attributes(name = '$.sweet', amount = '$.batch.count')
Once thie above substeps are completed, the mapping configuration looks as follows.
@map(type = 'json', @attributes(name = '$.sweet', amount = '$.batch.count')
The complete Siddhi application looks as follows.
@App:name('SweetTotalApp') @source(type='http', receiver.url='http://localhost:5005/SweetProductionEP', @map(type = 'json', @attributes(name = '$.sweet', amount = '$.batch.count'))) define stream SweetProductionStream (name string, amount long); @sink(type='log', prefix='Sweet Totals:') define stream SweetTotalStream(name string, totalProduction long); @info(name='SweetTotalQuery') from SweetProductionStream select name, sum(amount) as totalProduction group by name insert into SweetTotalStream;
- Let's save the changes you made toÂ
SweetTotalsApp
 Siddhi application. Then click the following icon to start it so that it can process the events you send.
 Now you can issue the following cURL commands to send the two incoming events as sent by the Sweet Bots.
curl -X POST \ http://localhost:5005/SweetProductionEP \ -H 'content-type: application/json' \ -d '{ "sweet": "Jaffa Cake", "batch": { "batch id": "batch1", "count": 10 } }'
curl -X POST \ http://localhost:5005/SweetProductionEP \ -H 'content-type: application/json' \ -d '{ "sweet": "Jaffa Cake", "batch": { "batch id": "batch1", "count": 15 } }'
As a result, you should see the same output in the CLI logs and you saw when you carried out step 6.