Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Introduction

This sample demonstrates how to track orders and delivery information of a pizza shop. It keeps track of pizza orders and their delivery information, and to sends notifications when the delivery is done within a given time period. It uses Event Tables that facilitate storing and querying of events, and time windows for buffering events temporarily for a short time period. This sample uses wso2event as inputs and outputsuse external time windows for a fraud detection use-case. In this sample, we look for two or more transactions done within a very short period of time and send an alert immediately when such an occurrence is detected.

The query used in this sample is as follows:

Code Block
definefrom table pizza_deliveries
(deliveredTime long, order_id string) from ('datasource.name'='cepSampleDataSource', 'table.name'='cepSampleTable');

Using the above query, define a table (in-memory event table) that stores two attributes named deliveredTime and order_id.

Code Block
from deliveryStream 
select time, orderNo
insert into pizza_deliveries;

Insert the time and orderNo attribute values of each event received through the deliveryStream to the pizza_deliveries table defined earlier.

Code Block
from orderStream#window.time(30 seconds) 
insert into overdueDeliveries for expired-events;

A separate stream named orderStream receives order information. In this sample, we buffer them for 30 seconds (in a real business scenario, the buffer time can be much longer). Then we send the buffered information out to trigger the final query (given below), which handles the actual notification. We put out only expired events to the overdueDeliveries so that the overdueDeliveries stream always receives events 30 seconds after the actual order is placed. We can use these events to trigger other queries that analyze whether the pizza is delivered within 30 seconds.  

Code Block
from overdueDeliveries as overdueStream unidirectional join pizza_deliveries 
on pizza_deliveries.order_id == overdueStream.orderNo 
select count(overdueStream.orderNo) as sumOrderId, overdueStream.customerName
insert into deliveredOrders;

The above query,

...

  • Events received through the overdueDeliveries arrive 30 seconds after the actual order happens
  • A result of this join operation indicates (since we match the order ids) that the order is already delivered within 30 seconds

...

atm_transactions#window.externalTime(meta_timestamp, 60 sec)
select cardNumber, count(cardNumber) as transactionCount, sum(amount) as totalAmount 
group by cardNumber   
insert current events into transactions_per_card ;			
				
from transactions_per_card[transactionCount > 1] 
select cardNumber, transactionCount, totalAmount 
insert into alert_stream;

The first query uses a 60-second external time window, which keeps events based on the time of the meta_timestamp attribute. Upon arrival of each new event, it gets a count of the transactions so far (last 60 seconds), sum of the amount per each card and emits the results to an intermediate stream named transactions_per_card.

The second query looks for the condition where more than one transaction has taken place for a specific card and sends an alert. 

Prerequisites

See Prerequisites in CEP Samples Setup page.

Before starting the server, you need to add the MySQL JDBC driver  Download it and place the jar inside <CEP_HOME>/repository/components/lib. You also need to have MySQL installed and running before executing this sample. Create a new database in MySQL which is to be used when creating the cepSampleDataSource below.

Start the server normally and create a new data source named 'cepSampleDataSource' using the Configure --> Data Sources page in the management console. for more information on adding a datasource see Adding Datasources. Once this is done, shutdown the server. (Here cepSampleDataSource ponits to  the newly created or already existing database)

Building the sample

Start the WSO2 CEP server with the sample configuration numbered 01090114. For instructions, see Starting sample CEP configurations. This sample configuration does the following:

...

  • Points the default Axis2 repo to <CEP_HOME>/sample/artifacts/01090114 (by default, the Axis2 repo is <CEP_HOME>/repository/deployment/server).

Executing the sample

  1. Open a terminal, go to <CEP_HOME>/samples/consumersproducers/wso2-event and run ant from there.
    It builds the sample wso2event consumer and executes it.

    Info

    Do not close this terminal. It is required to keep the server running and receiving events.

    Open another terminal, go to <CEP_HOME>/samples/producers/pizza-shop and run the following command:

    Code Block
    ant pizzaPublisherClient -DstreamId=org.wso2.sample.atm.transactions:1.0.0 -Dsn=0114

    It builds and runs the wso2event producer, which sends sample pizza orders and delivery data ATM transaction events to the CEP server.

  2. From the terminal opened in step 2, see the details of the events sent. Note that it takes around 30 seconds to complete sending the events as we do time-based analysis in this sample. 

    Info

    To configure host, port, username, password and No. of events use -Dhost=xxxx -Dport=xxxx -Dusername=xxxx -Dpassword=xxxx -Devents=xx.

    For example : ant -Devents=10

    For example

  3. After sending events, you can see the outputs from the CEP console (the outputs are logged by the logger which we use for this sample), given below is part of the console output of the consumer logger when sending events from the producer.

    Image Removed

...

  1. Image Added