Analyze the purchases of a credit card over last 100 minutes done from two locations to identify potential frauds. Purchase details of two locations will be published to two different CEP servers. Those purchase details will be searched for a pattern to identify frauds. Pattern used in this example is a small valued purchase followed by a large valued purchase within a time frame of 100 minutes.
This sample is implemented using JMS broker using Siddhi engine.
Follow the below steps to run the sample
Step 1:
Download the ActiveMQ from " http://activemq.apache.org/activemq-543-release.html "
unzip the distribution and run the ActiveMQ server using the command “./activemq console” from apache-activemq-xxx/bin
Step 2:
Unzip CEP server
Step 3:
Copy paste activemq-all-xxx.jar from the ActiveMQ home directory to wso2cep-2.0.0/samples/lib directory.
Copy paste activemq-core-xxx.jar and geronimo-j2ee-management_1.1_spec-1.0.1.jar from apache-activemq-xxx/lib to wso2cep-2.0.0/repository/components/lib directory
Create JMS Broker
Before creating the bucket to filter purchases, it is essential to have a broker adaptor. Since In this example we are going to use JMS Broker, it is needed to create a broker with type jms. For more information about brokers visit here.
Step 1 :
Start CEP Server and log in as admin
Step 2 :
In the Configure menu you can find a Menu item called "Broker" and under that you can see sub menu 'Add' and click on that.
Step 3 :
You will get a page with header "Create a New Broker" and you need to enter following details in that form to create a jms broker
Finally click on Add Broker button and you will get the added broker to the list of available brokers.
Create Bucket with Siddhi
Use the "Add" menu in CEP Buckets to add buckets. There are three main states in bucket creation namely, basic information, inputs, queries. Follow the below guide to create the bucket. Screen-shots are provide for your convenience. If you need more information about buckets visit here.
Use the following information to fill the basic information section.
Use following information to fill the input section of bucket.
Input properties
State 3 - Queries
Use information given below to fill the query section of the bucket. Screen-shot is provided for your convenience.
Defining query : Here we define the basic query.
Defining the Output : Here we are defining the topic which filtered events are published and the structure of the output xml even
Create Another CEP Node
Step 1: Unzip another CEP Server and before starting the Server changed wso2cep-2.0.0-1/repository/conf/carbon.xml
<Offset>0</Offset> to <Offset>1</Offset>
This is to overcome server port conflicts .
Step 2 : start the CEP Server
Step 3 : create another Bucket with input topic = "PurchaseUK" and other details as same as earlier bucket.
Invoking Deployed Bucket
It is needed to have a JMS Publisher and a JMS Subscriber to invoke the created bucket by sending inputs and receive them.
Creating JMS Publisher
JMS ActiveMQ Broker is used in this sample to publish and subscribe events.
To create the JMS Publisher which publishes inputs, it is necessary to have following classes and property files in your project
- jndi.properties
- JNDIContext.java
- PurchasePublisherUS.java
- PurchasePublisherUK.java
- activemq-all-xxx.jar should be in your class path
jndi.properties should include the following line
import javax.jms.TopicConnectionFactory;
import javax.naming.InitialContext;
import javax.naming.NamingException;
public class JNDIContext {
private InitialContext initContext = null;
private TopicConnectionFactory topicConnectionFactory = null;
public static JNDIContext instance = null;
private JNDIContext() {
createInitialContext();
createConnectionFactory();
}
public InitialContext getInitContext() {
return initContext;
}
public TopicConnectionFactory getTopicConnectionFactory() {
return topicConnectionFactory;
}
public static JNDIContext getInstance() {
if (instance == null) {
instance = new JNDIContext();
}
return instance;
}
/**
* Create Connection factory with initial context
*/
private void createConnectionFactory() {
// create connection factory
try {
topicConnectionFactory = (TopicConnectionFactory) initContext.lookup("ConnectionFactory");
} catch (NamingException e) {
System.out.println("Can not create topic connection factory." + e);
}
}
/**
* Create Initial Context with given configuration
*/
private void createInitialContext() {
try {
initContext = new InitialContext();
} catch (NamingException e) {
System.out.println("Can not create initial context with given parameters." + e);
}
}
}
Below class will publish the purchase details from US to CEP server
package org.wso2.carbon.cep.sample.jms;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import javax.xml.stream.XMLStreamException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
/**
* Publish message to the topic created when defining Input
*/
public class PurchasesPublisherUS {
private static InitialContext initContext = null;
private static TopicConnectionFactory topicConnectionFactory = null;
public static void main(String[] args) throws XMLStreamException, InterruptedException, NamingException {
Properties initialContextProperties = new Properties();
initialContextProperties.put("java.naming.factory.initial", "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
initContext = new InitialContext(initialContextProperties);
topicConnectionFactory = JNDIContext.getInstance().getTopicConnectionFactory();
PurchasesPublisherUS publisher = new PurchasesPublisherUS();
String topicName = "PurchaseUS";
Map<String, Object> map1 = new HashMap<String, Object>();
map1.put("cardNo", "1234-3244-2432-4124");
map1.put("price", 26.36f);
publisher.publish(topicName, map1);
System.out.println("Purchase Message 1 sent");
Thread.sleep(2000);
Map<String, Object> map2 = new HashMap<String, Object>();
map2.put("cardNo", "3234-3244-2432-4124");
map2.put("price", 73.36f);
publisher.publish(topicName, map2);
System.out.println("Purchase Message 2 sent");
Thread.sleep(2000);
Map<String, Object> map4 = new HashMap<String, Object>();
map4.put("cardNo", "1244-4244-2442-4124");
map4.put("price", 643252216.46f);
publisher.publish(topicName, map4);
System.out.println("Purchase Message 3 sent");
Thread.sleep(2000)
}
/* Publish message to given topic
* @param topicName - topic name to publish messages
* @param message - message to send
*/
public void publish(String topicName, Map<String, Object> message) {
// create topic connection
TopicConnection topicConnection = null;
try {
topicConnection = topicConnectionFactory.createTopicConnection();
topicConnection.start();
} catch (JMSException e){
System.out.println("Can not create topic connection." + e);
return;
}
Session session = null;
try {
session = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(topicName);
MessageProducer producer = session.createProducer(topic);
MapMessage jmsMessage = session.createMapMessage();
for (Map.Entry<String, Object> entry : message.entrySet()) {
if (entry.getValue() instanceof Double) {
jmsMessage.setDouble(entry.getKey(), (Double) entry.getValue());
} else if (entry.getValue() instanceof Integer) {
jmsMessage.setInt(entry.getKey(), (Integer) entry.getValue());
} else if (entry.getValue() instanceof Long) {
jmsMessage.setLong(entry.getKey(), (Long) entry.getValue());
} else if (entry.getValue() instanceof Float) {
jmsMessage.setFloat(entry.getKey(), (Float) entry.getValue());
} else if (entry.getValue() instanceof String) {
jmsMessage.setString(entry.getKey(), (String) entry.getValue());
}
}
producer.send(jmsMessage);
producer.close();
session.close();
topicConnection.stop();
topicConnection.close();
} catch (JMSException e) {
System.out.println("Can not subscribe." + e);
}
}
}
Below class will publish the purchase details from UK to CEP server
package org.wso2.carbon.cep.sample.jms;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import javax.xml.stream.XMLStreamException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
/**
* Publish message to the topic created when defining Input
*/
public class PurchasesPublisherUK {
private static InitialContext initContext = null;
private static TopicConnectionFactory topicConnectionFactory = null;
public static void main(String[] args)
throws XMLStreamException, InterruptedException, NamingException {
Properties initialContextProperties = new Properties();
initialContextProperties.put("java.naming.factory.initial", "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
initContext = new InitialContext(initialContextProperties);
topicConnectionFactory = JNDIContext.getInstance().getTopicConnectionFactory();
PurchasesPublisherUK publisher = new PurchasesPublisherUK();
String topicName = "PurchaseUK";
Map<String, Object> map3 = new HashMap<String, Object>();
map3.put("cardNo", "1234-3244-2432-4124");
map3.put("price", 23242346.66f);
publisher.publish(topicName, map3);
System.out.println("Purchase Message 1 sent");
Thread.sleep(2000);
Map<String, Object> map5 = new HashMap<String, Object>();
map5.put("cardNo", "1254-5244-2452-4124");
map5.put("price", 2906.56f);
publisher.publish(topicName, map5);
System.out.println("Purchase Message 2 sent");
Thread.sleep(2000);
Map<String, Object> map6 = new HashMap<String, Object>();
map6.put("cardNo", "3234-3244-2432-4124");
map6.put("price", 23242346.66f);
publisher.publish(topicName, map6);
System.out.println("Purchase Message 3 sent");
Thread.sleep(2000);
Map<String, Object> map7 = new HashMap<String, Object>();
map7.put("cardNo", "1244-4244-2442-4124");
map7.put("price", 56.76f);
publisher.publish(topicName, map7);
System.out.println("Purchase Message 4 sent");
}
/**
* Publish message to given topic
*
* @param topicName - topic name to publish messages
* @param message - message to send
*/
public void publish(String topicName, Map<String, Object> message) {
// create topic connection
TopicConnection topicConnection = null;
try {
topicConnection = topicConnectionFactory.createTopicConnection();
topicConnection.start();
} catch (JMSException e) {
System.out.println("Can not create topic connection." + e);
return;
}
Session session = null;
try {
session = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(topicName);
MessageProducer producer = session.createProducer(topic);
MapMessage jmsMessage = session.createMapMessage();
for (Map.Entry<String, Object> entry : message.entrySet()) {
if (entry.getValue() instanceof Double) {
jmsMessage.setDouble(entry.getKey(), (Double) entry.getValue());
} else if (entry.getValue() instanceof Integer) {
jmsMessage.setInt(entry.getKey(), (Integer) entry.getValue());
} else if (entry.getValue() instanceof Long) {
jmsMessage.setLong(entry.getKey(), (Long) entry.getValue());
} else if (entry.getValue() instanceof Float) {
jmsMessage.setFloat(entry.getKey(), (Float) entry.getValue());
} else if (entry.getValue() instanceof String) {
jmsMessage.setString(entry.getKey(), (String) entry.getValue())
}
}
producer.send(jmsMessage);
producer.close();
session.close();
topicConnection.stop();
topicConnection.close();
} catch (JMSException e) {
System.out.println("Can not subscribe." + e);
}
}
}
Create a project with above classes and files and run both publishers to publish events to the topic.
Creating JMS Subscribe
JMS Subscriber will receive events published to the output topic of the query. In order to create a JMS Subscriber , you need to have above jndi.properties file and JNDIContext.java class and Subscriber.java class, PotentialFraudSubscriber.java
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Enumeration;
/**
* Subscribe to myTopic and wait 10seconds to receive messages
*/
public class Subscriber implements MessageListener {
private static InitialContext initContext = null;
private static TopicConnectionFactory topicConnectionFactory = null;
private boolean messageReceived = false;
// static String TOPIC = "ConditionSatisfyingStockQuotes";
// public static void main(String[] args) throws XMLStreamException {
// new Subscriber().subscribe(TOPIC);
// }
public void subscribe(String topicName) throws NamingException {
initContext = JNDIContext.getInstance().getInitContext();
topicConnectionFactory = JNDIContext.getInstance().getTopicConnectionFactory();
// create connection
TopicConnection topicConnection = null;
try {
topicConnection = topicConnectionFactory.createTopicConnection();
} catch (JMSException e) {
System.out.println("Can not create topic connection." + e);
return;
}
// create session, subscriber, message listener and listen on that topic
TopicSession session = null;
try {
session = topicConnection.createTopicSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
// change topic name to brokerConfiguration name + topic name
Topic topic = session.createTopic(topicName);
TopicSubscriber subscriber = session.createSubscriber(topic);
subscriber.setMessageListener(this);
topicConnection.start();
synchronized (this) {
while (!messageReceived) {
try {
this.wait();
} catch (InterruptedException e) {
}
}
}
} catch (JMSException e) {
System.out.println("Can not subscribe." + e);
}
}
public void onMessage(Message message) {
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("output = " + textMessage.getText());
synchronized (this) {
messageReceived = true;
}
} catch (JMSException e) {
System.out.println("error at getting text out of received message. = " + e);
}
} else if (message instanceof MapMessage) {
try {
Enumeration enumeration = ((MapMessage) message).getMapNames();
for (; enumeration.hasMoreElements(); ) {
System.out.println(((MapMessage) message).getString((String) enumeration.nextElement()));
}
System.out.println();
} catch (JMSException e) {
System.out.println("error at getting element out of received map message. = " + e);
}
} else {
System.out.println("Received message is not a text/map message.");
}
}
}
package org.wso2.carbon.cep.sample.jms
import javax.naming.NamingException;
public class PotentialFraudSubscriber {
public static void main(String[] args) throws NamingException {
new Subscriber().subscribe("PotentialFraud");
}
}