WSO2 Stream Processor (SP) is a lightweight and lean, streaming SQL based stream processing platform that allows you to collect events, analyze them in real-time, identify patterns, map their impacts, and communicate the results within milliseconds. It is powered by Siddhi to be extremely high performing.
First, let's understand the following concepts that are used in this guide:
Stream Processing and Complex Event Processing Overview
Let's understand what an event is through an example. If we consider the transactions carried out via an ATM as a data stream, one withdrawal from it can be considered an event. This event contains data about the amount, time, account number etc. Many such transactions form a stream.
Stream processing engines allow you to create a processing graph and inject events into it. Each operator processes and sends events to next processor.
A complex event is an event that summarizes, represents or denotes a set of other events. Complex Event Processing is a subset of Stream Processing which involves analyzing multiple streams of events in real time, recognizing particular sequences or patterns across streams and inferring a business significant event from correlated events.
The stream processing capabilities of WSO2 SP allow you to capture high volume data flows and process them in real time, and present results in a streaming manner while its complex event processing capabilities detect patterns and trends for decision making via Patterns and Sequences supported for Siddhi.
Siddhi overview
WSO2 SP uses the Siddhi query language to write the processing logic for its Siddhi applications. Siddhi can:
Accept event inputs from many different types of sources
Process them to generate insights
Publish them to many types of sinks.
To use Siddhi, you need to write the processing logic as a Siddhi Application in the Siddhi Streaming SQL language. After writing and starting a Siddhi application, it:
Takes data one-by-one as events
Processes the data in each event
Generates new high level events based on the processing done so far
Sends newly generated events as the output to streams.
Before you begin:
- Install Oracle Java SE Development Kit (JDK) version 1.8.
- Set the JAVA_HOME environment variable.
- Download the latest WSO2 Stream Processor.
- Extract the downloaded zip and navigate to the
<SP_HOME>/bin
directory (<SP_HOME>
is the extracted directory). - Issue one of the following commands to start the WSO2 Stream Processor Studio.
- For Windows:
editor.bat
- For Linux: ./
editor.sh
- For Windows:
Once WSO2 SP server is successfully started, a log similar to the following is printed in the CLI.
The server log prints the Stream Processor Studio URL in the start up logs as shown below.
Scenario
In this scenario, you are creating an application for Shipping Wave, a fictitious large scale shipping company. Smith, the cargo manager needs to keep track of the total weight of cargo loaded to a ship at any given time. Measuring the weight of a cargo box when it is loaded to the ship is considered an event.
Let's get started! You can write a simple Siddhi application to calculate the total weight with each cargo box loaded to the ship by following the steps below.
Step 1: Create a Siddhi application
Access the Stream Processor Studio via the
http://<HOST_NAME>:<EDITOR_PORT>/editor
URL.The default URL is
http://localhost:9390/editor
The Stream Processor Studio opens as shown below.
Enter a name for your Siddhi application. In this scenario, let's name the application
CargoWeightApp
as shown below.@App:name("CargoWeightApp")
Defining the input stream. The stream needs to have a name and a schema defining the data that each incoming event should contain. The event data attributes are expressed as name and type pairs. In this example:
The name of the input stream:
CargoStream
A name to refer to the data in each event:
weight
Type of the data received as weight:
int
define stream CargoStream (weight int);
Define an output stream. This has the same info as the previous definition with an additional
totalWeight
attribute that contains the total weight calculated so far. Here, we need to add asink
configuration to log the events from theOutputStream
so that we can observe the output values.A sink specifies the method of publishing streams to external systems via Siddhi. In this scenario, the sink added is of the
log
type, and it publishes output streams as logs in the CLI.@sink(type='log', prefix='LOGGER') define stream OutputStream(weight int, totalWeight long);
- Enter a Siddhi query that defines the following.
- A name for the query (i.e.,
cargoWeightQuery
) - The input stream from which the events to be processed are taken (i.e.,
CargoStream
) - The data that needs to be sent to the output stream (i.e.,
weight
andtotalWeight
) - How the output needs to be calculated (i.e., by calculating the sum of the weight of all the events)
- The stream to which the output needs to be sent (i.e.,
OutputStream
)
This query is as follows:@info(name='CargoWeightQuery') from CargoStream select weight, sum(weight) as totalWeight insert into OutputStream;
The completed Siddhi file is as follows:
@App:name("CargoWeightApp") define stream CargoStream (weight int); @sink(type='log', prefix='LOGGER') define stream OutputStream(weight int, totalWeight long); @info(name='CargoWeightQuery') from CargoStream select weight, sum(weight) as totalWeight insert into OutputStream;
- A name for the query (i.e.,
- To save the Siddhi file, click File => Save. This opens the Save to Workspace dialog box. Click Save to save this file in the
<SP_HOME>/wso2/editor/deployment/workspace
directory (which is the default location where Siddhi applications are saved).
Step 2: Simulate events
The Stream Processor Studio has in-built support to simulate events. To test whether the CargoWeightApp
you created works as expected, let's simulate some events by following the steps given below.
- To start the
CargoWeight
Siddhi application, click the play button.
If the application is successfully started, the following is logged in the Stream Processor Studio console. - Click the following icon in the Stream Processor Studio to open the event simulation panel.
In the Single Simulation tab of the panel, select values as follows:
Field Value Siddhi App Name CargoWeight
Stream Name CargoStream
As a result, the Weight atrribute of the
CargoStream
stream is displayed as follows:In the weight field, enter 1000 and click Start to start the Siddhi application. Then click Send to send the event. The event is logged in the CLI as shown below:
Send five more events with the following values.
Event No Weight 1 2000 2 1500 3 2000 4 3000 5 1000 The events are logged as follows.
Each new weight is added to the new weight. Therefore, after all six events are sent, the total weight is
10500
.
Step 3: Edit Siddhi application to perform temporal processing
This section demonstrates how to carry out temporal window processing with Siddhi.
In the previous scenario, you carried out processing by having only the running sum value in-memory. No events were stored during this process.
Window processing is a method that allows us to store some events in-memory for a given period or a specific number of events so that we can perform operations such as calculating the average, maximum, etc values within them
Let's consider that Smith, the cargo manager has an additional requirement to calculate the average for the last three cargo boxes loaded each time a new cargo box is loaded in order to balance the weight across the ship. Here, we are considering a window that consists of three events as shown in the image below.
To achieve this, edit the Siddhi application by following the steps below:
Add a new attribute named
averageWeight
to the definition of theOutputStream
stream so that each output event presents the average weight in addition to the weight of the new box loaded and the total weight.define stream OutputStream(weight int, totalWeight long, averageWeight double);
To specify how to calculate the average weight, apply the
avg
Siddhi function to theweight
attribute in theselect
statement as shown below. This indicates that the average is calulated for theweight
attribute of incoming events.select weight, sum(weight) as totalWeight, avg(weight) as averageWeight
To specify that the calculations performed by this query with each event must be applied only to the last three events received, apply a length window to the input stream as shown below.
from CargoStream#window.length(3)
This window applies to all the calculations performed for the events taken from the
CargoStream
stream. Therefore, adding this window also results in the total weight being calculated based on the last three events.
The completed query is as follows.
@info(name='CargoWeightQuery') from CargoStream#window.length(3) select weight, sum(weight) as totalWeight, avg(weight) as averageWeight insert into OutputStream;
The complete CargoWeight
Siddhi application is as follows.
@App:name("CargoWeight") define stream CargoStream (weight int); @sink(type='log', prefix='LOGGER') define stream OutputStream(weight int, totalWeight long, averageWeight double); @info(name='CargoWeightQuery') from CargoStream#window.length(3) select weight, sum(weight) as totalWeight, avg(weight) as averageWeight insert into OutputStream;
Step 4: Simulate events for the edited Siddhi application
In this step, let's start the edited Siddhi application and simulate the same six events that you simulated in Step 8.
Event No | Weight |
---|---|
1 | 1000 |
2 | 2000 |
3 | 1500 |
4 | 2000 |
5 | 3000 |
6 | 1000 |
The output generated is logged as shown below.