This documentation is for WSO2 Complex Event Processor 2.0.0. View documentation for the latest release.

Fraud Analyzer

Analyze the purchases of a credit card over last 100 minutes done from two locations to identify potential frauds. Purchase details of two locations will be published to two different CEP servers. Those purchase details will be searched for a pattern to identify frauds. Pattern used in this example is a small valued purchase followed by a large valued purchase within a time frame of 100 minutes.

This sample is implemented using JMS broker using Siddhi engine.

Follow the below steps to run the sample


Configure and run ActiveMQ

Step 1:

Download the ActiveMQ from " http://activemq.apache.org/activemq-543-release.html "
unzip the distribution and run the ActiveMQ server using the command “./activemq console” from apache-activemq-xxx/bin

Step 2:

 Unzip CEP server  

  Step 3: 

 Copy paste activemq-all-xxx.jar from the ActiveMQ home directory to wso2cep-2.0.0/samples/lib directory. 
 Copy paste activemq-core-xxx.jar and geronimo-j2ee-management_1.1_spec-1.0.1.jar from apache-activemq-xxx/lib
 to wso2cep-2.0.0/repository/components/lib directory

Create JMS Broker

Before creating the bucket to filter purchases, it is essential to have a broker adaptor. Since In this example we are going to use JMS Broker, it is needed to create a broker with type jms. For more information about brokers visit here.

Step 1 :

          Start CEP Server and log in as admin

Step 2 :

          In the Configure menu you can find a Menu item called "Broker" and under that you can see sub menu 'Add' and click on that.

Step 3 :

You will get a page with header "Create a New Broker" and you need to enter following details in that form to create a jms broker

Broker Name  : activemqJmsBroker
Broker Type  : jms
JNDI Name    : org.apache.activemq.jndi.ActiveMQInitialContextFactory
User Name    : admin
Password     : admin
Provider URL : tcp://localhost:61616

Finally click on Add Broker button and you will get the added broker to the list of available brokers.

 

Create Bucket with Siddhi

Use the "Add" menu in CEP Buckets to add buckets. There are three main states in bucket creation namely, basic information, inputs, queries. Follow the below guide to create the bucket. Screen-shots are provide for your convenience. If you need more information about buckets visit here.

State 1 - Basic Information


Use the following information to fill the basic information section. 

  Bucket Name (Name of the bucket)                      : FraudAnalyzer
  Description (Description about the bucket)            : Analyze the purchases over last 10 minutes and outputs purchase counts and sum.

  Engine Provider(CEP Runtime engine to be used)        : SiddhiCEPRuntime [Choose from the drop down]

  Persistence snapshot time interval in minutes         : 0
  Enable distributed processing                         : true

State 2 - Inputs

Use following information to fill the input section of bucket.

        Topic( topic to events be received)                : PurchaseUS
        Broker Name (Broker to be used)                    : activemqJmsBroker 
        Mapping Stream (Name of the event stream)          : purchase 
        Query Event Type                                   : Tuple
        Mapping Type                                       : Map Mapping

  Input properties 

      
Property
        Name         	: price  
        Input Name 		: price  
        type            : java.lang.Float[Choose from the drop down]

Property       
        Name         	: cardNo  
        Input Name 		: cardNo 
        type            : java.lang.String[Choose from the drop down]
 
Click on add button to add the defined property to the input and it will be appeared in the Properties table once it added


After filling all required fields click on the add button to add Input to the Bucket. Once you clicked the input form will disappear and the added input will be appeared in Inputs tabl

State 3 - Queries

Use information given below to fill the query section of the bucket. Screen-shot is provided for your convenience.

Defining query : Here we define the basic query.

Query Name (To identify the query) : PotentialFraudQuery 
Expression : from every p1 = purchase[100>price] -> p2 = purchase[price>100000 and cardNo==p1.cardNo] within 6000000   
             insert into potentialFraud p1.cardNo as cardNo,  p1.price as price1, p2.price as price2

Defining the Output : Here we are defining the topic which filtered events are published and the structure of the output xml even

               
    Topic(topic which filtered event published)  : PotentialFraud
    BrokerName (Broker to be used)               : activemqJmsBroker
    Output Mapping                               :  Map Mapping

	Properties
                   Name     : cardNo 
                   Value Of : cardNo

                   Name      : price1  
                   Value Of  : price1  
After filling all the required fields of the Query , click on add query button to add Query to the bucket. Once you clicked it will disappear the query form and added query will be appeared in Queries table.  

As the last step of adding a bucket , click on save button. Once you clicked it will redirect to the deployed buckets page and deployed bucket will be appeared in buckets table. Further to check whether you have entered the details correctly you can click on the link with the defined bucket and it will show added details. You can come back to the previous adding bucket page by clicking on the back button provided or if needed to edit the bucket you can click on the "Edit" link on the top of the View bucket Page

Create Another CEP Node

Step  1:  Unzip another   CEP Server and before starting the Server  changed wso2cep-2.0.0-1/repository/conf/carbon.xml

            <Offset>0</Offset>     to     <Offset>1</Offset>

           This is to overcome server port conflicts  .

Step 2 : start the CEP Server

Step 3 : create another Bucket with input topic = "PurchaseUK" and other details as same as earlier bucket.

Invoking Deployed Bucket 

It is needed to have a JMS Publisher and a JMS Subscriber to invoke the created bucket by sending inputs and receive them.

Creating JMS Publisher 

     JMS ActiveMQ Broker is used in this sample to publish and subscribe events.    

      To create the JMS Publisher which publishes inputs, it is necessary to have following classes and property files in your project    

  • jndi.properties
  • JNDIContext.java
  • PurchasePublisherUS.java
  • PurchasePublisherUK.java  
  • activemq-all-xxx.jar  should be in your class path
        
    jndi.properties   should include the following line
java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory


JNDIContext
import javax.jms.TopicConnectionFactory;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class JNDIContext {
	private InitialContext initContext = null;
	private TopicConnectionFactory topicConnectionFactory = null;
	public static JNDIContext instance = null;

	private JNDIContext() {
		createInitialContext();
		createConnectionFactory();
	} 

	public InitialContext getInitContext() {
		return initContext;
	}

	public TopicConnectionFactory getTopicConnectionFactory() {
		return topicConnectionFactory;
	}

	public static JNDIContext getInstance() {
		if (instance == null) {
			instance = new JNDIContext();
		}
		return instance;
	}

 

	/**
	* Create Connection factory with initial context
	*/
	private void createConnectionFactory() {

		// create connection factory
		try {
			topicConnectionFactory = (TopicConnectionFactory) initContext.lookup("ConnectionFactory");
		} catch (NamingException e) {
			System.out.println("Can not create topic connection factory." + e);
		}
	}

 

	/**
	* Create Initial Context with given configuration
	*/
	private void createInitialContext() {

		try {
			initContext = new InitialContext();
		} catch (NamingException e) {
			System.out.println("Can not create initial context with given parameters." + e);
		}
	}
} 

  Below class will publish the purchase details from US to CEP server

Purchase Publisher US
package org.wso2.carbon.cep.sample.jms;

import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import javax.xml.stream.XMLStreamException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;


/**
 * Publish message to the topic created when defining Input
 */


public class PurchasesPublisherUS {

	private static InitialContext initContext = null;
	private static TopicConnectionFactory topicConnectionFactory = null;
	
	public static void main(String[] args) throws XMLStreamException, InterruptedException, NamingException {
		Properties initialContextProperties = new Properties();
		initialContextProperties.put("java.naming.factory.initial", "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
		initContext = new InitialContext(initialContextProperties);
		topicConnectionFactory = JNDIContext.getInstance().getTopicConnectionFactory();
		PurchasesPublisherUS publisher = new PurchasesPublisherUS();
		String topicName = "PurchaseUS";

 	        Map<String, Object> map1 = new HashMap<String, Object>();
	        map1.put("cardNo", "1234-3244-2432-4124");
	        map1.put("price", 26.36f);
	        publisher.publish(topicName, map1);
	        System.out.println("Purchase Message 1 sent");
		Thread.sleep(2000);

        	Map<String, Object> map2 = new HashMap<String, Object>();
		map2.put("cardNo", "3234-3244-2432-4124");
		map2.put("price", 73.36f);
		publisher.publish(topicName, map2);
		System.out.println("Purchase Message 2 sent");
		Thread.sleep(2000);

 	        Map<String, Object> map4 = new HashMap<String, Object>();
		map4.put("cardNo", "1244-4244-2442-4124");
		map4.put("price", 643252216.46f);
		publisher.publish(topicName, map4);
	        System.out.println("Purchase Message 3 sent");
		Thread.sleep(2000)

  	}

	/*  Publish message to given topic
	 * @param topicName - topic name to publish messages
	 * @param message   - message to send
	*/

 	public void publish(String topicName, Map<String, Object> message) {

       		// create topic connection
		TopicConnection topicConnection = null;
		try {
			topicConnection = topicConnectionFactory.createTopicConnection();
			topicConnection.start();
		} catch (JMSException e){
			System.out.println("Can not create topic connection." + e);
			 return;
		}
		Session session = null;
		try {
			session = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
		        Topic topic = session.createTopic(topicName);
		        MessageProducer producer = session.createProducer(topic);
		        MapMessage jmsMessage = session.createMapMessage();
		        for (Map.Entry<String, Object> entry : message.entrySet()) {
	                	if (entry.getValue() instanceof Double) {
					jmsMessage.setDouble(entry.getKey(), (Double) entry.getValue());
			        } else if (entry.getValue() instanceof Integer) {
			                jmsMessage.setInt(entry.getKey(), (Integer) entry.getValue());
		                } else if (entry.getValue() instanceof Long) {
					jmsMessage.setLong(entry.getKey(), (Long) entry.getValue());
				} else if (entry.getValue() instanceof Float) {
				        jmsMessage.setFloat(entry.getKey(), (Float) entry.getValue());
				} else if (entry.getValue() instanceof String) {
				        jmsMessage.setString(entry.getKey(), (String) entry.getValue());
				}
            		}
			producer.send(jmsMessage);
		        producer.close();
		        session.close();
		        topicConnection.stop();
		        topicConnection.close();
          	} catch (JMSException e) {
		        System.out.println("Can not subscribe." + e);
	        }
	}
}

Below class will publish the purchase details from UK to CEP server

Purchase Publisher UK
package org.wso2.carbon.cep.sample.jms;

import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import javax.xml.stream.XMLStreamException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

/**
* Publish message to the topic created when defining Input
*/

public class PurchasesPublisherUK {
	private static InitialContext initContext = null;
	private static TopicConnectionFactory topicConnectionFactory = null;

	public static void main(String[] args)
            throws XMLStreamException, InterruptedException, NamingException {
		Properties initialContextProperties = new Properties();
		initialContextProperties.put("java.naming.factory.initial", "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
  	        initContext = new InitialContext(initialContextProperties);
 	        topicConnectionFactory = JNDIContext.getInstance().getTopicConnectionFactory();
	        PurchasesPublisherUK publisher = new PurchasesPublisherUK();
	        String topicName = "PurchaseUK";

     		Map<String, Object> map3 = new HashMap<String, Object>();
		map3.put("cardNo", "1234-3244-2432-4124");
	        map3.put("price", 23242346.66f);
	        publisher.publish(topicName, map3);
                System.out.println("Purchase Message 1 sent");
                Thread.sleep(2000);
		
        	Map<String, Object> map5 = new HashMap<String, Object>();
	        map5.put("cardNo", "1254-5244-2452-4124");
	        map5.put("price", 2906.56f);
	        publisher.publish(topicName, map5);
	        System.out.println("Purchase Message 2 sent");
	        Thread.sleep(2000);
	
          	Map<String, Object> map6 = new HashMap<String, Object>();
	        map6.put("cardNo", "3234-3244-2432-4124");
	        map6.put("price", 23242346.66f);
	        publisher.publish(topicName, map6);
	        System.out.println("Purchase Message 3 sent");
	        Thread.sleep(2000);
		
         	Map<String, Object> map7 = new HashMap<String, Object>();
	        map7.put("cardNo", "1244-4244-2442-4124");
	        map7.put("price", 56.76f);
	        publisher.publish(topicName, map7);
	        System.out.println("Purchase Message 4 sent");
	 }


	/**
	* Publish message to given topic
	*
	* @param topicName - topic name to publish messages
	* @param message   - message to send
	*/
	public void publish(String topicName, Map<String, Object> message) {
	        // create topic connection
		TopicConnection topicConnection = null;
	        try {
            		topicConnection = topicConnectionFactory.createTopicConnection();
            		topicConnection.start();
        	} catch (JMSException e) {
			System.out.println("Can not create topic connection." + e);
		         return;
		}
		Session session = null;
		try {
            		session = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
 			Topic topic = session.createTopic(topicName);
			MessageProducer producer = session.createProducer(topic);
			MapMessage jmsMessage = session.createMapMessage();
			for (Map.Entry<String, Object> entry : message.entrySet()) {
				if (entry.getValue() instanceof Double) {
					jmsMessage.setDouble(entry.getKey(), (Double) entry.getValue());
 				} else if (entry.getValue() instanceof Integer) {
					jmsMessage.setInt(entry.getKey(), (Integer) entry.getValue());
				} else if (entry.getValue() instanceof Long) {
					jmsMessage.setLong(entry.getKey(), (Long) entry.getValue());
				} else if (entry.getValue() instanceof Float) {
					jmsMessage.setFloat(entry.getKey(), (Float) entry.getValue());
				} else if (entry.getValue() instanceof String) {
					jmsMessage.setString(entry.getKey(), (String) entry.getValue())
				}
			}
			producer.send(jmsMessage);
			producer.close();
			session.close();
			topicConnection.stop();
 			topicConnection.close();
		} catch (JMSException e) {
			System.out.println("Can not subscribe." + e);
	  	}
 	}
}

Create a project with above classes and files and run both publishers to publish events to the topic.

Creating JMS Subscribe

JMS Subscriber will receive events published to the output topic of the query. In order to create a JMS Subscriber , you need to have above jndi.properties file and JNDIContext.java class and Subscriber.java class, PotentialFraudSubscriber.java

Subscriber
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Enumeration;

 

/**
* Subscribe to myTopic and wait 10seconds to receive messages
*/
public class Subscriber implements MessageListener {
	private static InitialContext initContext = null;
	private static TopicConnectionFactory topicConnectionFactory = null;
	private boolean messageReceived = false;
	// static String TOPIC = "ConditionSatisfyingStockQuotes";
	// public static void main(String[] args) throws XMLStreamException {
	// new Subscriber().subscribe(TOPIC);
	// }

	public void subscribe(String topicName) throws NamingException {

 		initContext = JNDIContext.getInstance().getInitContext();
		topicConnectionFactory = JNDIContext.getInstance().getTopicConnectionFactory();
		// create connection
		TopicConnection topicConnection = null;
		try {
			topicConnection = topicConnectionFactory.createTopicConnection();
		} catch (JMSException e) {
			System.out.println("Can not create topic connection." + e);
			return;
		}
		// create session, subscriber, message listener and listen on that topic
		TopicSession session = null;
		try {
			session = topicConnection.createTopicSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
			// change topic name to brokerConfiguration name + topic name
			Topic topic = session.createTopic(topicName);
			TopicSubscriber subscriber = session.createSubscriber(topic);
			subscriber.setMessageListener(this);
			topicConnection.start();
			synchronized (this) {
				while (!messageReceived) {
					try {
						this.wait();
					} catch (InterruptedException e) {
					}
				}
			}

 		} catch (JMSException e) {
			System.out.println("Can not subscribe." + e);
		}
	}
	
	public void onMessage(Message message) {
		if (message instanceof TextMessage) {
			TextMessage textMessage = (TextMessage) message;
			try {
				System.out.println("output = " + textMessage.getText());
				synchronized (this) {
					messageReceived = true;
				}
			} catch (JMSException e) {
				System.out.println("error at getting text out of received message. = " + e);
			}
		} else if (message instanceof MapMessage) {
			try {
				Enumeration enumeration = ((MapMessage) message).getMapNames();
				for (; enumeration.hasMoreElements(); ) {
					System.out.println(((MapMessage) message).getString((String) enumeration.nextElement()));
				}
				System.out.println();
			} catch (JMSException e) {
				System.out.println("error at getting element out of received map message. = " + e);
			}
		} else {
			System.out.println("Received message is not a text/map message.");
		}
	}
}

 

Potential Fraud Subscriber
package org.wso2.carbon.cep.sample.jms

import javax.naming.NamingException;

public class PotentialFraudSubscriber {

    public static void main(String[] args) throws NamingException {

        new Subscriber().subscribe("PotentialFraud");

    }

}