This documentation is for WSO2 Complex Event Processor 2.0.0. View documentation for the latest release.

KPI Analyzer

This sample demonstrates how Siddhi engine can be used with Agent event broker to receive, process and publish Tuple messages.

In this sample CEP will receive phone retail information and fire outputs if a buyer has bought more then 3 phones for the total price higher than $2500.

from phoneRetailStream[totalPrice>2500 and quantity>3]
insert into highPurchaseStream 
buyer, brand, quantity, totalPrice; 

In this sample we will publish events using a custom Data-Bridge data publisher and the output events from the bucket will be sent to a test server to receive and log the incoming events.

Following is the configuration used in this sample.

<bucket name="KPIAnalyzer" xmlns="http://wso2.org/carbon/cep">
    <description>
Notifies when a user purchases more then 3 phones for the total price higher than $2500.
    </description>
    <engineProviderConfiguration engineProvider="SiddhiCEPRuntime">
        <property name="siddhi.persistence.snapshot.time.interval.minutes">0</property>
        <property name="siddhi.enable.distributed.processing">false</property>
    </engineProviderConfiguration>
    <input topic="org.wso2.phone.retail.store/1.2.0" brokerName="localAgentBroker">
        <tupleMapping stream="phoneRetailStream" <!--queryEventType="Tuple"--> >
            <property name="brand" inputName="brand" inputDataType="payloadData"
                      type="java.lang.String"/>
            <property name="quantity" inputName="quantity" inputDataType="payloadData"
                      type="java.lang.Integer"/>
            <property name="totalPrice" inputName="total" inputDataType="payloadData"
                      type="java.lang.Integer"/>
            <property name="buyer" inputName="buyer" inputDataType="payloadData"
                      type="java.lang.String"/>
        </tupleMapping>
    </input>
    <query name="KPIQuery">
        <expression>
from phoneRetailStream[totalPrice>2500 and quantity>3]
insert into highPurchaseStream
buyer, brand, quantity, totalPrice;
        </expression>
        <output topic="org.wso2.high.purchase.buyers/1.5.0" brokerName="externalAgentBroker">
            <tupleMapping>
                <metaData>
                    <property name="buyer" valueOf="buyer"
                              type="java.lang.String"/>
                </metaData>
                <correlationData/>
                <payloadData>
                    <property name="brand" valueOf="brand" type="java.lang.String"/>
                    <property name="quantity" valueOf="quantity" type="java.lang.Integer"/>
                    <property name="purchasePrice" valueOf="totalPrice" type="java.lang.Integer"/>
                </payloadData>
            </tupleMapping>
        </output>
    </query>
</bucket>

Prerequisites

Deploying the configurations

The steps are as follows :

  1. Install the WSO2 Complex Event Processor, but do not start the server,  Refer to the Installation Guide for instructions.
  2. In a command prompt, switch to the sample directory: <CEP_HOME>/samples/cep-samples 
    For example, in Linux: cd <CEP_HOME>/samples/cep-samples
  3. From there, type ant deploy-agent,
    This will copy the broker-manager-config.xml to <CEP_HOME>/repository/conf directory and the bucket configuration to <CEP_HOME>/repository/deployment/server/cepbuckets directory.

Starting test server

The steps are as follows :  

  1. In a new command prompt, switch to the sample directory: <CEP_HOME>/samples/cep-samples 
    For example, in Linux: cd <CEP_HOME>/samples/cep-samples
  2. From there, type ant agentTestServer, this will start the test server to receive the output events of CEP.

Running the Client

The steps are as follows :

  1. In a command prompt, switch to the CEP samples directory:<CEP_HOME>/samples/cep-samples 
    For example, in Linux: cd <CEP_HOME>/samples/cep-samples
  2. From there, type ant agentPhoneRetailClient
    This will send 20 thrift events to CEP.

Observation

You will be able observe the stream definition (at the first invocation) followed by the filtered events in the test server console as shown bellow.