Introduction
This sample demonstrates how to use external time windows for a fraud detection use-case. In this sample, we look for two or more transactions done within a very short period of time and send an alert immediately when such an occurrence is detected.
The query used in this sample is as follows:
from atm_transactions#window.externalTime(meta_timestamp, 60 sec) select cardNumber, count(cardNumber) as transactionCount, sum(amount) as totalAmount group by cardNumber insert current events into transactions_per_card ; from transactions_per_card[transactionCount > 1] select cardNumber, transactionCount, totalAmount insert into alert_stream;
The first query uses a 60-second external time window, which keeps events based on the time of the meta_timestamp attribute. Upon arrival of each new event, it gets a count of the transactions so far (last 60 seconds), sum of the amount per each card and emits the results to an intermediate stream named transactions_per_card.
The second query looks for the condition where more than one transaction has taken place for a specific card and sends an alert.
Prerequisites
See Prerequisites in CEP Samples Setup page.
Before starting the server, you need to add the MySQL JDBC driver Download it and place the jar inside <CEP_HOME>/repository/components/lib. You also need to have MySQL installed and running before executing this sample. Create a new database in MySQL which is to be used when creating the cepSampleDataSource below.
Start the server normally and create a new data source named 'cepSampleDataSource' using the Configure --> Data Sources page in the management console. for more information on adding a datasource see Adding Datasources. Once this is done, shutdown the server. (Here cepSampleDataSource ponits to the newly created or already existing database)
Building the sample
Start the WSO2 CEP server with the sample configuration numbered 0109. For instructions, see Starting sample CEP configurations. This sample configuration does the following:
- Creates
<CEP_HOME>/repository/conf/data-bridge/stream-definitions.xml
file, which is used to create the stream definitions for the sample. - Points the default Axis2 repo to
<CEP_HOME>/sample/artifacts/0109
(by default, the Axis2 repo is<CEP_HOME>/repository/deployment/server
).
Executing the sample
Open a terminal, go to
<CEP_HOME>/samples/consumers/wso2-event
and run ant from there.
It builds the samplewso2event
consumer and executes it.Do not close this terminal. It is required to keep the server running and receiving events.
Open another terminal, go to
<CEP_HOME>/samples/producers/pizza-shop
and run the following command:ant pizzaPublisherClient
It builds and runs the
wso2event
producer, which sends sample pizza orders and delivery data to the CEP server.From the terminal opened in step 2, see the details of the events sent. Note that it takes around 30 seconds to complete sending the events as we do time-based analysis in this sample.
To configure host, port, username, password and No. of events use -Dhost=xxxx -Dport=xxxx -Dusername=xxxx -Dpassword=xxxx -Devents=xx.
For example : ant -Devents=10
For example, given below is part of the console output of the consumer when sending events from the producer.
4. Log in to MySQL check the database created when setting up the sample. It should contain a new table named 'cepSampleTable' with a few events stored inside it.