Introduction
This sample demonstrates how to track orders and delivery information of a pizza shop. It keeps track of pizza orders and their delivery information, and to sends notifications when the delivery is done within a given time period. It uses Event Tables that facilitate storing and querying of events, and time windows for buffering events temporarily for a short time period. This sample uses wso2event
as inputs and outputs.
The query used in this sample is as follows:
define table pizza_deliveries (deliveredTime long, order_id string);
Using the above query, define a table (in-memory event table) that stores two attributes named deliveredTime
and order_id
.
from deliveryStream select time, orderNo insert into pizza_deliveries;
Insert the time
and orderNo
attribute values of each event received through the deliveryStream
to the pizza_deliveries
table defined earlier.
from orderStream#window.time(30 seconds) insert into overdueDeliveries for expired-events;
A separate stream named orderStream
receives order information. In this sample, we buffer them for 30 seconds (in a real business scenario, the buffer time can be much longer). Then we send the buffered information out to trigger the final query (given below), which handles the actual notification. We put out only expired events to the overdueDeliveries
so that the overdueDeliveries
stream always receives events 30 seconds after the actual order is placed. We can use these events to trigger other queries that analyze whether the pizza is delivered within 30 seconds.
from overdueDeliveries as overdueStream unidirectional join pizza_deliveries on pizza_deliveries.order_id == overdueStream.orderNo select count(overdueStream.orderNo) as sumOrderId, overdueStream.customerName insert into deliveredOrders;
The above query,
- Joins
pizza_deliveries
table with theoverdueDeliveries
stream (the aliasoverdueStream
is used for convenience here) - Uses a join operation to compare each event in the event table (
pizza_deliveries
) with each incoming event fromoverdueDeliveries
stream
Note that,- Events received through the
overdueDeliveries
arrive 30 seconds after the actual order happens - A result of this join operation indicates (since we match the order ids) that the order is already delivered within 30 seconds
- Events received through the
- Inserts the successful delivery information to the
deliveredOrders
stream, which can be used to monitor the status of orders
Prerequisites
See Prerequisites in CEP Samples Setup page.
Building the sample
Start the WSO2 CEP server with the sample configuration numbered 0108. For instructions, see Starting sample CEP configurations. This sample configuration does the following:
- Creates
<CEP_HOME>/repository/conf/stream-manager-config.xml
file, which is used to create the stream definitions for the sample. - Points the default Axis2 repo to
<CEP_HOME>/sample/artifacts/0108
(by default, the Axis2 repo is<CEP_HOME>/repository/deployment/server
).
Executing the sample
Open a terminal, go to
<CEP_HOME>/samples/consumers/wso2-event
and run ant from there.
It builds the samplewso2event
consumer and executes it.Do not close this terminal. It is required to keep the server running and receiving events.
Open another terminal, go to
<CEP_HOME>/samples/producers/pizza-shop
and run the following command:ant pizzaPublisherClient
It builds and runs the
wso2event
producer, which sends sample pizza orders and delivery data to the CEP server.From the terminal opened in step 2, see the details of the events sent. Note that it takes around 30 seconds to complete sending the events as we do time-based analysis in this sample.
To configure host, port, username, password and No. of events use -Dhost=xxxx -Dport=xxxx -Dusername=xxxx -Dpassword=xxxx -Devents=xx.
For example : ant -Devents=10
For example, given below is part of the console output of the consumer when sending events from the producer.