Quick Start Guide
To view a screencast of the Quick Start Guide, click here and scroll down.
WSO2 Complex Event Processor monitors the transactions and activities of an enterprise (referred to as events), analyses them and presents the results to a range of interfaces in real time. Not having to store this information enables it to process and present information with greater speed.
The following sections walk you through the basic features of the CEP to get you started.
Getting Started Video
You can also find the content here as the Getting Started Video
Before you begin,
- Install Oracle Java SE Development Kit (JDK) version 1.7* or 1.8 and set the
JAVA_HOME
environment variable. - Download WSO2 CEP.
- Start the CEP by going to
<CEP_HOME>/bin
using the command-line and executingwso2server.bat
(for Windows) orwso2server.sh
(for Linux.)
Creating a simple event flow
An event flow refers to a specific combination of event streams, event receivers, event publishers and execution plans. The following steps describe how to define these elements of an event flow.
Step 1: Add an event stream
Event stream defines the events which goes through a particular flow by defining the event's attributes and it's types. An event stream can be created in the CEP Management Console as follows.
- Log into the CEP Management Console and click on the Main tab. Under Manage, click Streams to open the Available Event Streams page.
- Click Add Event Stream to open the Define New Event Stream page.
Enter information as follows to create a new event stream named
org.wso2.event.sensor.stream
.
Event Stream DetailsParameter Name Value Event Stream Name org.wso2.event.sensor.stream Event Stream Version 1.0.0 Stream Attributes
Click Add to add the attribute after entering the attribute name and attribute type.
Attribute Category Attribute Attribute Type Meta Data timestamp long isPowerSaverEnabled bool sensorId int sensorName string Correlation Data longitude double latitude double Payload Data humidity float sensorValue double Click Add Event Stream to save the information.
Step 2: Add an event receiver
Events received by the CEP server have different formats such as XML, JSON and Map. Event receivers transform these events to WSO2 Events that can be directed to an event stream defined in the CEP.
In this step, you will create an event receiver named httpReceiver
which directs events to the event stream named org.wso2.event.sensor.stream
that was created in Step 1. The receiver can be created using the Management Console as follows.
- Log into the CEP Management Console and click on the Main tab. Under Manage, click Receivers to open the Available Receivers page.
- Click Add Event Receiver to open the Create a New Event Receiver page.
Enter information as follows to create the new event receiver named
httpReceiver
.
Parameter Name Value Event Receiver Name httpReceiver Input Event Adapter Type http Transports all Event Stream org.wso2.event.sensor.stream Message Format json - Click Add Event Receiver to save the information.
Step 3: Add an Event Publisher
Event publishers publish events processed by the WSO2 servers to external applications. These events are published via HTTP, Kafka, JMS, etc. in JSON, XML, Map, text, and WSO2Event formats to various endpoints and data stores.
In this step, you will create an event publisher named loggerPublisher to publish events from the event stream named org.wso2.event.sensor.stream
that was created in Step 1. Since the output event adapter type of this publisher is logger
, the events published will be logged in the CEP CLI. The publisher can be created using the Management Console as follows.
- Log into the CEP Management Console and click on the Main tab. Under Manage, click Publishers to open the Available Publishers page.
- Click Add Event Publisher to open the Create a New Event Publisher page.
Enter information as follows to create the new event publisher named
loggerPublisher
.
Parameter Name Description Event Publisher Name loggerPublisher Event Source org.wso2.event.sensor.stream Output Event Adapter Type logger Message Format text
Click Add Event Publisher to save the information.
Step 4: View the simple event flow
This step involves viewing the event flow you created and understanding how the different elements in it are connected. The event flow can be viewed as follows.
- Log into the CEP Management Console if you are not already logged in.
- Click the Main tab and then click Flow to open the CEP Event Flow page. The event flow you created is displayed as follows.
This diagram indicates thathttpReceiver
forwards events to theorg.wso2.event.sensor.stream.1.0.0
stream. These events are then published by theloggerPublisher
.
Receive and log events
Step 5: Receive events via HTTP transport
Navigate to <CEP_HOME>/samples/producers/http
and run the following command which sends events to the CEP via the HTTP transport.
ant -Durl=http://localhost:9763/endpoints/httpReceiver -Dsn=0001
This builds the HTTP client and sends the events in the <CEP_HOME>/samples/artifacts/0001/httpReceiver.txt file
to the httpReceiver
endpoint. You can view the details of the events that are sent as shown in the log below. These logs are published by the publisher created in Step 3.
The logs of the JSON events received by the CEP server will be displayed in the CLI as shown in the example below.
Processing events with an execution plan
Step 6: Add another event stream
- Log into the CEP Management Console and click on the Main tab. Under Manage, click Streams to open the Available Event Streams page.
- Click Add Event Stream to open the Define New Event Stream page.
- Enter information as follows to create the event stream named
org.wso2.event.sensor.filtered.stream
.
Event Stream Details
Paramater Name Value Event Stream Name org.wso2.event.sensor.filtered.stream Event Stream Version 1.0.0 Stream Attributes
Attribute Category Attribute Attribute Type Meta Date timestamp long sensorName string Correlation Data longitude double latitude double Payload Data sensorValue double Click Add Event Stream to save the information.
Step 7: Add an execution plan
An Execution Plan can import one or more streams from the server for processing and push zero or more output streams back to the server. Refer Processing Events for further explanations.
- Log into the CEP Management Console and click on the Main tab. Under Manage, click Execution Plans to open the Available Execution Plans page.
- Click Add Execution Plan to open the Create a New Execution Plan page.
Enter information as follows to create the new execution plan
Parameter Name Value Import Stream org.wso2.event.sensor.stream:1.0.0 As sensorStream Value Of filteredStream StreamId org.wso2.event.sensor.filtered.stream:1.0.0 - Click Import and then click Export. The section for query expressions will be updated as shown below.
Add the following query expression.
from sensorStream [sensorValue > 100] select meta_timestamp, meta_sensorName, correlation_longitude, correlation_latitude, sensorValue insert into filteredStream
This query includes the value
sensorValue > 100
. Therefore, when the execution plan forwards events fromorg.wso2.event.sensor.stream:1.0.0
toorg.wso2.event.sensor.filtered.stream:1.0.0
, events in which the value for thesensorValue
attribute is less than 100 will be dropped.- Click Validate Query Expressions. Once you get a message to confirm that the queries are valid, click Add Execution Plan.
Publish events in dashboard
Step 8: Add a UI event publisher
In this step, you will add another publisher named uiPublisher
to publish events from the stream named org.wso2.event.sensor.filtered stream to the Analytics Dashboard.
- Log into the CEP Management Console and click on the Main tab. Under Manage, click Publishers to open the Available Publishers page.
- Click Add Event Publisher to open the Create a New Event Publisher page.
Enter information as follows to create the new event publisher named
UIPublisher
.
Parameter Name Description Event Publisher Name uiPublisher Event Source org.wso2.event.sensor.filtered.stream:1.0.0 Output Event Adapter Type ui Message Format wso2event Click Add Event Publisher to save the information.
Step 9: Create a dashboard and a gadget
WSO2 Analytics Dashboard will be used as the tool to analyse the output of the event flow you created in this guide. This step creates a dashboard and a gadget which analyses events from the org.wso2.event.sensor.filtered.stream
stream published by the uiPublisher
publisher.
- Log into the CEP Management Console. In the Main tab, click Analytics Dashboard.
- Log into the Analytics Dashboard with your username and password.
- Click CREATE GADGET to open the CREATE A GADGET page.
- Select
org.wso2.event.sensor.filtered.stream
for the Analytics Table/Event Stream parameter. Then click Next. Select Line for the Chart Type parameter. Then enter values as follows in the rest of the parameters relating to the chart.
Parameter Name Value Title Sensor Value VS Timestamp X-Axis TIMESTAMP Y-Axis sensorValue Color Dimension sensorName Number of Viewable Records 10 - Click Add to Gadget Store to save the gadget in the gadget store. You will return to the DASHBOARDS page.
- Click CREATE DASHBOARD to open the CREATE A DASHBOARD page.
Enter information as follows for the new dashboard and click Next.
Parameter Name Value Name of your Dashboard Sensor Statistics Description This dashboard indicates the sensor value at different times in a particular location. - Select the Single Column layout as the layout in the next page. The layout will open.
- Click the following icon to open the available gadgets.
- Drag and drop the Sensor Value VS Timestamp gadget to a grid in the layout as shown below.
Step 10: Send Events to the HTTP Receiver via Curl Command
This step sends events to the receiver named httpReceiver
using a curl command. These events are processed by the event flow you created, and published in the CEP CLI by the publisher created in Step 3.
Issue the following curl command.
curl -X POST -d "{ "event": { "metaData": { "timestamp": 1439468145264 , "isPowerSaverEnabled": false, "sensorId": 701, "sensorName": temperature }, "correlationData": { "longitude": 4.504343, "latitude": 20.44345 }, "payloadData": { "humidity": 2.3, "sensorValue": 96.5 } } }" http://localhost:9763/endpoints/httpReceiver --header "Content-Type:application/json"
The following log will appear in the CEP CLI.
Note that the Sensor Statistics dashboard you created does not get updated. This is because the value for the
sensorValue
attribute is less than 100 in this event. Therefore, it gets dropped by the filter you created in the execution plan, and as a result, it is not forwarded to theorg.wso2.event.sensor.filtered.
stream:1.0.0
stream.Issue another command with a value greater than 100 for the
sensorValue
attribute as follows.
curl -X POST -d "{ "event": { "metaData": { "timestamp":1439467524120 , "isPowerSaverEnabled": false, "sensorId": 701, "sensorName": temperature }, "correlationData": { "longitude": 4.504343, "latitude": 20.44345 }, "payloadData": { "humidity": 2.3, "sensorValue": 156 } } }" http://localhost:9763/endpoints/httpReceiver --header "Content-Type:application/json"
The following log will appear in he CEP CLI.
The event is forwarded to the
org.wso2.event.sensor.filtered.
stream:1.0.0
stream since the value for thesensorValue
attribute is greater than 100. Therefore, the Sensor Statistics dashboard will be updated as shown below.
Issue more curl commands as follows with several timestamps and sensor values.
curl -X POST -d "{ "event": { "metaData": { "timestamp": 1439467524120 , "isPowerSaverEnabled": false, "sensorId": 701, "sensorName": temperature }, "correlationData": { "longitude": 4.504343, "latitude": 20.44345 }, "payloadData": { "humidity": 2.3, "sensorValue": 156 } } }" http://localhost:9763/endpoints/httpReceiver --header "Content-Type:application/json"
curl -X POST -d "{ "event": { "metaData": { "timestamp": 1439467890957 , "isPowerSaverEnabled": false, "sensorId": 701, "sensorName": temperature }, "correlationData": { "longitude": 4.504343, "latitude": 20.44345 }, "payloadData": { "humidity": 2.3, "sensorValue": 170 } } }" http://localhost:9763/endpoints/httpReceiver --header "Content-Type:application/json"
curl -X POST -d "{ "event": { "metaData": { "timestamp": 1439467951518 , "isPowerSaverEnabled": false, "sensorId": 701, "sensorName": temperature }, "correlationData": { "longitude": 4.504343, "latitude": 20.44345 }, "payloadData": { "humidity": 2.3, "sensorValue": 131 } } }" http://localhost:9763/endpoints/httpReceiver --header "Content-Type:application/json"
curl -X POST -d "{ "event": { "metaData": { "timestamp": 1439467992936 , "isPowerSaverEnabled": false, "sensorId": 701, "sensorName": temperature }, "correlationData": { "longitude": 4.504343, "latitude": 20.44345 }, "payloadData": { "humidity": 2.3, "sensorValue": 126 } } }" http://localhost:9763/endpoints/httpReceiver --header "Content-Type:application/json"
curl -X POST -d "{ "event": { "metaData": { "timestamp": 1439468050928 , "isPowerSaverEnabled": false, "sensorId": 701, "sensorName": temperature }, "correlationData": { "longitude": 4.504343, "latitude": 20.44345 }, "payloadData": { "humidity": 2.3, "sensorValue": 145 } } }" http://localhost:9763/endpoints/httpReceiver --header "Content-Type:application/json"
Since the value for the
sensorValue
attribute is greater than 100 in all these events, the dashboard will be updated as shown below. These events will also be logged in the CEP CLI.Send sensor events with several timestamps and sensorValues, The dashboard will now get effected with the sent event since sensorValue is over 100 and the event gets sent to the dashboard. These events will get logged in the CEP CLI as well.
Deploying execution plans using templates
Step 11: Create and deploy a template
The following is the configuration of the execution plan you created in this guide as a template. This configuration should be saved in a XML file in the <CEP_HOME>/repository/conf/cep/domain-template
directory.
$filteringVal
can be configured to the expected filtering value from the Execution manager.
<templateDomain name="SensorStatistics"> <description>Sensor Statistics Analysis</description> <templates> <template name="Filtering template"> <description>To filter the sensor events with a given sensor value</description> <executionPlan> <![CDATA[ /* Enter a unique ExecutionPlan */ @Plan:name('ExecutionPlan') /* Enter a unique description for ExecutionPlan */ -- @Plan:description('ExecutionPlan') /* define streams/tables and write queries here ... */ @Import('org.wso2.event.sensor.stream:1.0.0') define stream sensorStream (meta_timestamp long, meta_isPowerSaverEnabled bool, meta_sensorId int, meta_sensorName string, correlation_longitude double, correlation_latitude double, humidity float, sensorValue double); @Export('org.wso2.event.sensor.filtered.stream:1.0.0') define stream filteredStream (meta_timestamp long, meta_sensorName string, correlation_longitude double, correlation_latitude double, sensorValue double); from sensorStream [sensorValue > $filteringVal] select meta_timestamp, meta_sensorName, correlation_longitude, correlation_latitude, sensorValue insert into filteredStream ]]> </executionPlan> <parameters> <parameter name="filteringVal" type="double"> <displayName>Filtering Value</displayName> <description>Only the sensor values below filtering value will be dropped</description> <defaultValue>100</defaultValue> </parameter> </parameters> </template> </templates> <streams> <stream> { "name": "org.wso2.event.sensor.filtered.stream", "version": "1.0.0", "nickName": "", "description": "", "metaData": [ { "name": "timestamp", "type": "LONG" }, { "name": "sensorName", "type": "STRING" } ], "correlationData": [ { "name": "longitude", "type": "DOUBLE" }, { "name": "latitude", "type": "DOUBLE" } ], "payloadData": [ { "name": "sensorValue", "type": "DOUBLE" } ] } </stream> <stream> { "name": "org.wso2.event.sensor.stream", "version": "1.0.0", "nickName": "", "description": "", "metaData": [ { "name": "timestamp", "type": "LONG" }, { "name": "isPowerSaverEnabled", "type": "BOOL" }, { "name": "sensorId", "type": "INT" }, { "name": "sensorName", "type": "STRING" } ], "correlationData": [ { "name": "longitude", "type": "DOUBLE" }, { "name": "latitude", "type": "DOUBLE" } ], "payloadData": [ { "name": "humidity", "type": "FLOAT" }, { "name": "sensorValue", "type": "DOUBLE" } ] } </stream> </streams> </templateDomain>
Step 12: Configure a template
Before you carry out this step, delete the event streams you created under Step1: Add an event stream, and Step 6: Add another event stream.
This step involves adding execution plan and stream configurations using the template you created and added in the previous step.
- If the CEP server was running when you created and deployed the template, restart the CEP server.
- Log into the CEP Management Console. Click the Main tab and then click Execution Manager. The dashboard home page with the available domains will be displayed as follows.
- Click on SensorStatistics to open the Configurations page. Then click Add Configuration.
Enter information as follows and then click Add Configuration.
Parameter Name Value Configuration Name FilterSensorValues Description Filter events which Filtering Value 125.6 You will receive a message to confirm that the configurations were successfully saved. Close the message. The configuration added will be displayed in the Configurations page as follows.
Step 13: View the elements of the event flow
Log into the CEP Management Console and click the Main tab. Then click Flow to open the CEP Event Flow page. The complete event flow you created in this guide will be displayed as follows.
The following is a summary of this guide which describes each element in the event flow.