To view a screencast of the Quick Start Guide, click here.
WSO2 Complex Event Processor monitors the transactions and activities of an enterprise (referred to as events), analyses them and presents the results to a range of interfaces in real time. Not having to store this information enables it to process and present information with greater speed.
The following sections walk you through the basic features of the CEP to get you started.
Getting Started Video
You can also find the content here as the Getting Started Video
Before you begin,
- Install Oracle Java SE Development Kit (JDK) version 1.7* or 1.8 and set the
JAVA_HOME
environment variable. - Download WSO2 CEP.
- Start the CEP by going to
<CEP_HOME>/bin
using the command-line and executingwso2server.bat
(for Windows) orwso2server.sh
(for Linux.)
Creating a simple event flow
An event flow refers to a specific combination of event streams, event receivers, event publishers and execution plans. The following steps describe how to define these elements of an event flow.
Step 1: Add an event stream
Event stream defines the events which goes through a particular flow by defining the event's attributes and it's types. An event stream can be created in the CEP Management Console as follows.
- Log into the CEP Management Console and click on the Main tab. Under Manage, click Streams to open the Available Event Streams page.
- Click Add Event Stream to open the Define New Event Stream page.
Enter information as follows to create a new event stream named
org.wso2.event.sensor.stream
.
Event Stream DetailsParameter Name Value Event Stream Name org.wso2.event.sensor.stream Event Stream Version 1.0.0 Stream Attributes
Click Add to add the attribute after entering the attribute name and attribute type.
Attribute Category Attribute Attribute Type Meta Data timestamp long isPowerSaverEnabled bool sensorId int sensorName string Correlation Data longitude double latitude double Payload Data humidity float sensorValue double Click Add Event Stream to save the information.
Step 2: Add an event receiver
Events received by the CEP server have different formats such as XML, JSON and Map. Event receivers transform these events to WSO2 Events that can be directed to an event stream defined in the CEP.
In this step, you will create an event receiver named httpReceiver
which directs events to the event stream named org.wso2.event.sensor.stream
that was created in Step 1. The receiver can be created using the Management Console as follows.
- Log into the CEP Management Console and click on the Main tab. Under Manage, click Receivers to open the Available Receivers page.
- Click Add Event Receiver to open the Create a New Event Receiver page.
Enter information as follows to create the new event receiver named
httpReceiver
.
Parameter Name Value Event Receiver Name httpReceiver Input Event Adapter Type http Transports all Event Stream org.wso2.event.sensor.stream Message Format json - Click Add Event Receiver to save the information.
Step 3: Add an Event Publisher
Event publishers publish events processed by the WSO2 servers to external applications. These events are published via HTTP, Kafka, JMS, etc. in JSON, XML, Map, text, and WSO2Event formats to various endpoints and data stores.
In this step, you will create an event publisher named loggerPublisher to publish events from the event stream named org.wso2.event.sensor.stream
that was created in Step 1. Since the output event adapter type of this publisher is logger
, the events published will be logged in the CEP CLI. The publisher can be created using the Management Console as follows.
- Log into the CEP Management Console and click on the Main tab. Under Manage, click Publishers to open the Available Publishers page.
- Click Add Event Publisher to open the Create a New Event Publisher page.
Enter information as follows to create the new event publisher named
loggerPublisher
.
Parameter Name Description Event Publisher Name loggerPublisher Event Source org.wso2.event.sensor.stream Output Event Adapter Type logger Message Format text
Click Add Event Publisher to save the information.
Step 4: View the simple event flow
This step involves viewing the event flow you created and understanding how the different elements in it are connected. The event flow can be viewed as follows.
- Log into the CEP Management Console if you are not already logged in.
- Click the Main tab and then click Flow to open the CEP Event Flow page. The event flow you created is displayed as follows.
This diagram indicates thathttpReceiver
forwards events to theorg.wso2.event.sensor.stream.1.0.0
stream. These events are then published by theloggerPublisher
.
Receive and log events
Step 5: Receive events via HTTP transport
Navigate to <CEP_HOME>/samples/cep/producers/http
and run the following command which sends events to the CEP via the HTTP transport.
ant -Durl=http://localhost:9763/endpoints/httpReceiver -Dsn=0001
This builds the HTTP client and sends the events in the <CEP_HOME>/samples/cep/artifacts/0001/httpReceiver.txt file
to the httpReceiver
endpoint. You can view the details of the events that are sent as shown in the log below. These logs are published by the publisher created in Step 3.
The logs of the JSON events received by the CEP server will be displayed in the CLI as shown in the example below.
Processing events with an execution plan
Step 6: Add another event stream
- Log into the CEP Management Console and click on the Main tab. Under Manage, click Streams to open the Available Event Streams page.
- Click Add Event Stream to open the Define New Event Stream page.
- Enter information as follows to create the event stream named
org.wso2.event.sensor.filtered.stream
.
Event Stream Details
Paramater Name Value Event Stream Name org.wso2.event.sensor.filtered.stream Event Stream Version 1.0.0 Stream Attributes
Attribute Category Attribute Attribute Type Meta Date timestamp long sensorName string Correlation Data longitude double latitude double Payload Data sensorValue double Click Add Event Stream to save the information.
Step 7: Add an execution plan
An Execution Plan can import one or more streams from the server for processing and push zero or more output streams back to the server. Refer Processing Events for further explanations.
- Log into the CEP Management Console and click on the Main tab. Under Manage, click Execution Plans to open the Available Execution Plans page.
- Click Add Execution Plan to open the Create a New Execution Plan page.
Enter information as follows to create the new execution plan
Parameter Name Value Import Stream org.wso2.event.sensor.stream:1.0.0 As sensorStream Value Of filteredStream StreamId org.wso2.event.sensor.filtered.stream:1.0.0 - Click Import and then click Export. The section for query expressions will be updated as shown below.
Add the following query expression.
from sensorStream [sensorValue > 100] select meta_timestamp, meta_sensorName, correlation_longitude, correlation_latitude, sensorValue insert into filteredStream
This query includes the value
sensorValue > 100
. Therefore, when the execution plan forwards events fromorg.wso2.event.sensor.stream:1.0.0
toorg.wso2.event.sensor.filtered.stream:1.0.0
, events in which the value for thesensorValue
attribute is less than 100 will be dropped.- Click Validate Query Expressions. Once you get a message to confirm that the queries are valid, click Add Execution Plan.
Publish events in dashboard
Step 8: Add a UI event publisher
In this step, you will add another publisher named uiPublisher
to publish events from the stream named org.wso2.event.sensor.filtered stream to the Analytics Dashboard.
- Log into the CEP Management Console and click on the Main tab. Under Manage, click Publishers to open the Available Publishers page.
- Click Add Event Publisher to open the Create a New Event Publisher page.
Enter information as follows to create the new event publisher named
UIPublisher
.
Parameter Name Description Event Publisher Name uiPublisher Event Source org.wso2.event.sensor.filtered.stream:1.0.0 Output Event Adapter Type ui Message Format wso2event Click Add Event Publisher to save the information.
Step 9: Create a dashboard and a gadget
WSO2 Analytics Dashboard will be used as the tool to analyse the output of the event flow you created in this guide. This step creates a dashboard and a gadget which analyses events from the org.wso2.event.sensor.filtered.stream
stream published by the uiPublisher
publisher.
- Log into the CEP Management Console. In the Main tab, click Analytics Dashboard.
- Log into the Analytics Dashboard with your username and password.
- Click the menu icon and then click Gadgets to open the Gadgets page as demonstrated below.
- Click GENERATE GADGET, and enter values in the Generate a Gadget wizard as follows.
- In the Select Provider field, select Realtime Data Source. Then click Next.
- In the Event Stream field, select org.wso2.event.sensor.filtered.stream:1.0.0. Then click Next.
Configure a chart as follows.
Parameter Name Value Gadget Name Sensor Value VS Timestamp Select Chart Type Line Chart X-Axis TIMESTAMP X type time Y-Axis sensorValue Y type default Color domain sensorName Max length 30 - Click Add to Store, and then click Go to Portal. the Dashboards page appears again.
- Click CREATE DASHBOARD to open the Create a Dashboard page. Configure a new dashboard as follows.
Enter a name and a description for the new dashboard as follows, and click Next.
Parameter Name Value Name of your Dashboard Sensor Statistics Description This dashboard indicates the sensor value at different times in a particular location. - Select the Single Column layout. A message appears to indicate that the dashboard is successfully created.
- Click the icon for gadgets. Then select and drag Sensor Value VS Timestamp gadget to the first column as demonstrated above.
Step 10: Send Events to the HTTP Receiver via Curl Command
This step sends events to the receiver named httpReceiver
using a curl command. These events are processed by the event flow you created, and published in the CEP CLI by the publisher created in Step 3.
Issue the following curl command.
curl -X POST -d "{ "event": { "metaData": { "timestamp": 1439468145264 , "isPowerSaverEnabled": false, "sensorId": 701, "sensorName": temperature }, "correlationData": { "longitude": 4.504343, "latitude": 20.44345 }, "payloadData": { "humidity": 2.3, "sensorValue": 96.5 } } }" http://localhost:9763/endpoints/httpReceiver --header "Content-Type:application/json"
The following log will appear in the CEP CLI.
Note that the Sensor Statistics dashboard you created does not get updated. This is because the value for the
sensorValue
attribute is less than 100 in this event. Therefore, it gets dropped by the filter you created in the execution plan, and as a result, it is not forwarded to theorg.wso2.event.sensor.filtered.
stream:1.0.0
stream.Issue another command with a value greater than 100 for the
sensorValue
attribute as follows.
curl -X POST -d "{ "event": { "metaData": { "timestamp":1439467524120 , "isPowerSaverEnabled": false, "sensorId": 701, "sensorName": temperature }, "correlationData": { "longitude": 4.504343, "latitude": 20.44345 }, "payloadData": { "humidity": 2.3, "sensorValue": 156 } } }" http://localhost:9763/endpoints/httpReceiver --header "Content-Type:application/json"
The following log will appear in he CEP CLI.
The event is forwarded to the
org.wso2.event.sensor.filtered.
stream:1.0.0
stream since the value for thesensorValue
attribute is greater than 100. Therefore, the Sensor Statistics dashboard will be updated as shown below.
Issue more curl commands as follows with several timestamps and sensor values.
curl -X POST -d "{ "event": { "metaData": { "timestamp": 1439467524120 , "isPowerSaverEnabled": false, "sensorId": 701, "sensorName": temperature }, "correlationData": { "longitude": 4.504343, "latitude": 20.44345 }, "payloadData": { "humidity": 2.3, "sensorValue": 156 } } }" http://localhost:9763/endpoints/httpReceiver --header "Content-Type:application/json"
curl -X POST -d "{ "event": { "metaData": { "timestamp": 1439467890957 , "isPowerSaverEnabled": false, "sensorId": 701, "sensorName": temperature }, "correlationData": { "longitude": 4.504343, "latitude": 20.44345 }, "payloadData": { "humidity": 2.3, "sensorValue": 170 } } }" http://localhost:9763/endpoints/httpReceiver --header "Content-Type:application/json"
curl -X POST -d "{ "event": { "metaData": { "timestamp": 1439467951518 , "isPowerSaverEnabled": false, "sensorId": 701, "sensorName": temperature }, "correlationData": { "longitude": 4.504343, "latitude": 20.44345 }, "payloadData": { "humidity": 2.3, "sensorValue": 131 } } }" http://localhost:9763/endpoints/httpReceiver --header "Content-Type:application/json"
curl -X POST -d "{ "event": { "metaData": { "timestamp": 1439467992936 , "isPowerSaverEnabled": false, "sensorId": 701, "sensorName": temperature }, "correlationData": { "longitude": 4.504343, "latitude": 20.44345 }, "payloadData": { "humidity": 2.3, "sensorValue": 126 } } }" http://localhost:9763/endpoints/httpReceiver --header "Content-Type:application/json"
curl -X POST -d "{ "event": { "metaData": { "timestamp": 1439468050928 , "isPowerSaverEnabled": false, "sensorId": 701, "sensorName": temperature }, "correlationData": { "longitude": 4.504343, "latitude": 20.44345 }, "payloadData": { "humidity": 2.3, "sensorValue": 145 } } }" http://localhost:9763/endpoints/httpReceiver --header "Content-Type:application/json"
Since the value for the
sensorValue
attribute is greater than 100 in all these events, the dashboard will be updated as shown below. These events will also be logged in the CEP CLI.Send sensor events with several timestamps and sensorValues, The dashboard will now get effected with the sent event since sensorValue is over 100 and the event gets sent to the dashboard. These events will get logged in the CEP CLI as well.
Deploying execution plans using templates
Step 11: Create and deploy a template
In this step, the configurations of the WSO2 CEP artifacts that you previously created are added as templates via the Template Manager tool. This allows you to reuse the same artifacts for different scenarios where the sensor value differs.
Copy the following template, and save it with the SensorStatistics.xml
file name in the <CEP_HOME>/repository/conf/template-manager/domain-template
directory.
The $sensorValue
attribute in this template is defined as a configurable parameter by using the $
sign in the attribute name. Therefore, in each scenario you create from this template, you can specify a different sensor value based on which the events are to be filtered.
<?xml version="1.0"?> <!-- ~ Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. ~ ~ WSO2 Inc. licenses this file to you under the Apache License, ~ Version 2.0 (the "License"); you may not use this file except ~ in compliance with the License. ~ You may obtain a copy of the License at ~ ~ http://www.apache.org/licenses/LICENSE-2.0 ~ ~ Unless required by applicable law or agreed to in writing, ~ software distributed under the License is distributed on an ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ~ KIND, either express or implied. See the License for the ~ specific language governing permissions and limitations ~ under the License. --> <domain name="SensorStatistics"> <description>Domain for sensor data analysis</description> <scenarios> <scenario type="AnalyzeSensorStatistics"> <description>Configure a sensor analytics scenario to display statistics for a given stream of your choice </description> <templates> <!--Note: These will be deployed in the order they appear here--> <!-- Input Event Stream--> <template type="eventstream"> { "streamId": "org.wso2.event.sensor.stream:1.0.0", "name": "org.wso2.event.sensor.stream", "version": "1.0.0", "nickName": "", "description": "", "metaData": [ { "name": "timestamp", "type": "LONG" }, { "name": "isPowerSaverEnabled", "type": "BOOL" }, { "name": "sensorId", "type": "INT" }, { "name": "sensorName", "type": "STRING" } ], "correlationData": [ { "name": "longitude", "type": "DOUBLE" }, { "name": "latitude", "type": "DOUBLE" } ], "payloadData": [ { "name": "humidity", "type": "FLOAT" }, { "name": "sensorValue", "type": "DOUBLE" } ] } </template> <!-- Output Event Stream--> <template type="eventstream"> { "streamId": "org.wso2.event.sensor.filtered.stream:1.0.0", "name": "org.wso2.event.sensor.filtered.stream", "version": "1.0.0", "nickName": "", "description": "", "metaData": [ { "name": "timestamp", "type": "LONG" }, { "name": "sensorName", "type": "STRING" } ], "correlationData": [ { "name": "longitude", "type": "DOUBLE" }, { "name": "latitude", "type": "DOUBLE" } ], "payloadData": [ { "name": "sensorValue", "type": "DOUBLE" } ] } </template> <!-- Realtime Execution Plan--> <template type="realtime"> <![CDATA[ /* Enter a unique ExecutionPlan */ @Plan:name('ExecutionPlan') /* Enter a unique description for ExecutionPlan */ -- @Plan:description('ExecutionPlan') /* define streams/tables and write queries here ... */ @Import('org.wso2.event.sensor.stream:1.0.0') define stream sensorStream (meta_timestamp long, meta_isPowerSaverEnabled bool, meta_sensorId int, meta_sensorName string, correlation_longitude double, correlation_latitude double, humidity float, sensorValue double); @Export('org.wso2.event.sensor.filtered.stream:1.0.0') define stream filteredStream (meta_timestamp long, meta_sensorName string, correlation_longitude double, correlation_latitude double, sensorValue double); from sensorStream [sensorValue > $filteringVal] select meta_timestamp, meta_sensorName, correlation_longitude, correlation_latitude, sensorValue insert into filteredStream ]]> </template> <template type="eventreceiver"> <![CDATA[ <?xml version="1.0" encoding="UTF-8"?> <eventReceiver name="httpReceiver" statistics="disable" trace="disable" xmlns="http://wso2.org/carbon/eventreceiver"> <from eventAdapterType="http"> <property name="basicAuthEnabled">true</property> <property name="transports">all</property> </from> <mapping customMapping="disable" type="json"/> <to streamName="org.wso2.event.sensor.stream" version="1.0.0"/> </eventReceiver>]]> </template> <!-- Event Publisher--> <template type="eventpublisher"> <![CDATA[ <?xml version="1.0" encoding="UTF-8"?> <eventPublisher name="loggerPublisher" statistics="disable" trace="disable" xmlns="http://wso2.org/carbon/eventpublisher"> <from streamName="org.wso2.event.sensor.stream" version="1.0.0"/> <mapping customMapping="disable" type="text"/> <to eventAdapterType="logger"/> </eventPublisher> ]]> </template> <template type="eventpublisher"> <![CDATA[ <?xml version="1.0" encoding="UTF-8"?> <eventPublisher name="uiPublisher" statistics="disable" trace="disable" xmlns="http://wso2.org/carbon/eventpublisher"> <from streamName="org.wso2.event.sensor.filtered.stream" version="1.0.0"/> <mapping customMapping="disable" type="wso2event"/> <to eventAdapterType="ui"/> </eventPublisher> ]]> </template> <!-- Gadget line chart --> <template type="gadget"> <config> <properties> <property name="directoryName">$sensorType-line-chart</property> <property name="templateDirectory">lineChart</property> </properties> <artifacts> <artifact file="gadget.json"> <![CDATA[ { "id": "$sensorType-line-chart", "title": "$sensorType-line-chart", "type": "gadget", "thumbnail": "gadget/$sensorType-line-chart/thumbnail.png", "data": { "url": "gadget/$sensorType-line-chart/gadget.xml" } } ]]> </artifact> <artifact file="conf.json"> <![CDATA[ { "provider-conf" : { "streamName" : "org.wso2.event.sensor.filtered.stream:1.0.0", "provider-name" : "realtime" }, "chart-conf" : { "x" : "TIMESTAMP", "xType" : "time", "y" : "sensorValue", "yType" : "default", "color" : "sensorName", "maxLength" : "30", "gadget-name" : "$sensorType-line-chart", "chart-name" : "line-chart" } } ]]> </artifact> <artifact file="js/core/gadget-util.js"> <![CDATA[ var getGadgetLocation = function (callback) { var gadgetLocation = "/portal/store/carbon.super/fs/gadget/$sensorType-line-chart"; var PATH_SEPERATOR = "/"; if (gadgetLocation.search("store") != -1) { wso2.gadgets.identity.getTenantDomain(function (tenantDomain) { var gadgetPath = gadgetLocation.split(PATH_SEPERATOR); var modifiedPath = ''; for (var i = 1; i < gadgetPath.length; i++) { if (i === 3) { modifiedPath = modifiedPath.concat(PATH_SEPERATOR, tenantDomain); } else { modifiedPath = modifiedPath.concat(PATH_SEPERATOR, gadgetPath[i]) } } callback(modifiedPath); }); } else { callback(gadgetLocation); } } ]]> </artifact> </artifacts> </config> </template> <!-- Gadget line chart --> <!-- Dashboard --> <template type="dashboard"> <config> <properties> <property name="dashboardId">analytics-$sensorType-dashboard</property> </properties> <content> <![CDATA[ { "id": "analytics-$sensorType-dashboard", "title": "Analytics $sensorType Dashboard", "description": "This dashboard indicates the sensor value at different times in a particular location", "permissions": { "viewers": [ "Internal\/sensor-statistics-viewer" ], "editors": [ "Internal\/sensor-statistics-editor" ], "owners": [ "Internal\/sensor-statistics-owner" ] }, "pages": [ { "id": "landing", "title": "Home", "layout": { "content": { "loggedIn": { "blocks": [ { "id": "90dfe9100dc10dca1ae562e7f7451a4a", "x": 0, "y": 0, "width": 12, "height": 3, "banner": false } ] } }, "fluidLayout": false }, "isanon": false, "content": { "default": { "90dfe9100dc10dca1ae562e7f7451a4a": [ { "id": "$sensorType-line-chart-0", "content": { "id": "$sensorType-line-chart", "title": "$sensorType-line-chart", "type": "gadget", "thumbnail": "fs:\/\/gadget\/$sensorType-line-chart\/thumbnail.png", "data": { "url": "fs:\/\/gadget\/$sensorType-line-chart\/gadget.xml" }, "styles": { "title": "$sensorType-line-chart", "borders": true }, "options": { "windowSize": { "type": "STRING", "title": "Window Size", "value": "10", "options": [ ], "required": false } }, "locale_titles": { } } } ] }, "anon": { } } } ], "menu": [ { "id": "landing", "isanon": false, "ishidden": false, "title": "Home", "subordinates": [ ] } ], "hideAllMenuItems": false, "identityServerUrl": "", "accessTokenUrl": "", "apiKey": "", "apiSecret": "", "theme": "Default Theme", "shareDashboard": false, "isUserCustom": false, "isEditorEnable": true, "banner": { "globalBannerExists": false, "customBannerExists": false }, "landing": "landing", "isanon": false } ]]> </content> </config> </template> </templates> <parameters> <parameter name="filteringVal" type="string"> <displayName>Filtering Value</displayName> <description>Only the sensor values below filtering value will be dropped</description> <defaultValue>100</defaultValue> </parameter> <parameter name="sensorType" type="string"> <displayName>Sensor Type Name</displayName> <description>The name of the sensor type</description> <defaultValue>temperature</defaultValue> </parameter> </parameters> </scenario> </scenarios> </domain>
Step 12: Configure a template
Before you carry out this step
- Copy
Sensor Value VS Timestamp
gadget which resides in {PRODUCT_HOME}/repository/deployment/server/jaggeryapps/portal/store/carbon.super/fs/gadget/ and copy the folder to wso2cep-4.2.0/repository/conf/template-manager/gadget-templates and rename it as lineChart - delete the following artifacts that you have already configured in steps 1 - 9.
org.wso2.event.sensor.stream
event streamorg.wso2.event.sensor.filtered.stream event stream
httpReceiver
event receiverExecutionPlan
execution planloggerPublisher
event publisheruiPublisher
event publisherSensor Value VS Timestamp
gadgetSensor Statistics
dashboard
This step involves adding execution plan and stream configurations using the template you created and added in the previous step.
- If the CEP server was running when you created and deployed the template, restart the CEP server.
- Log into the CEP Management Console. Click the Main tab and then click Template Manager. Create a new scenario as demonstrated below.
- Click on SensorStatistics to open the Deployed Scenarios page. Then click Create New Scenario to open the Edit Scenario page.
Enter information as shown in the table below and click Add scenario.
Parameter Name Value Scenario Type AnalyzeSensorStatistics
Scenario Name FilterSensorValues Description Filter events with a sensor value greater than 120 Sensor Value 120 A message appears to inform you that the scenario is successfully created. Close the message. The scenario you configured is displayed in the Deployed Scenarios page
Step 13: View the elements of the event flow
Log into the CEP Management Console and click the Main tab. Then click Flow to open the CEP Event Flow page. The complete event flow you created in this guide is displayed as follows.
This event flow is displayed when you create the artifacts manually, as well as when you deploy them in a template and then create a scenario.
The following is a summary of this guide which describes each element in the event flow.