In this example, we output the the last 10 min average and sum of all stock quotes. This sample is implemented using JMS broker.
Step 1:
Download the ActiveMQ from "http://activemq.apache.org/activemq-543-release.html ".
unzip the distribution and run the ActiveMQ server using the command “./activemq console” from apache-activemq-xxx/bin
Step 2:
Download CEP server and install. Do not start the server. Refer to the Installation Guide for instructions .
Step 3:
Copy paste activemq-all-xxx.jar from the ActiveMQ home directory to wso2cep-2.0.0/samples/lib directory.
Copy paste activemq-core-xxx.jar and geronimo-j2ee-management_1.1_spec-1.0.1.jar from apache-activemq-xxx/lib to wso2cep-2.0.0/repository/components/lib directory.
Step 4:
Run WSO2 CEP. Refer to the Installing the Product for instructions .
Create JMS Broker
Before creating the bucket to filter stock quotes and twitter feeds, it is essential to have a Carbon broker. Since in this example we are going to use JMS Broker, it is needed to create a broker with type jms. For more information about brokers visit here.
Step 1 :
Start CEP Server and log in as admin. Refer to the Installing the Product for instructions.
Step 2 :
In the Configure menu you can find a Menu item called "Broker" and under that you can see sub menu 'Add' and click on that.
Step 3 :
You will get a page with header "Create a New Broker" and you need to enter following details in that form to create a jms broker.
Finally click on Add Broker button and you will get the added broker to the list of available brokers.
Create Bucket with Siddhi
Use the "Add" menu in CEP Buckets to add buckets. Use following information and create the bucket. If you need more information about buckets visit here.
Defining Inputs : Click on Add Input link and it will provide a form to define inputs. You can define the sample input by entering following values.
Defining input properties
Section 3 : Queries
Defining the Output : Here we are defining the topic which filtered events are published and the structure of the output xml.
Invoking Deployed Bucket
It is needed to have a JMS Publisher and a JMS Subscriber to invoke the created bucket.
Creating JMS Publisher
JMS ActiveMQ Broker is used in this sample to publish and subscribe events.
To create the JMS Publisher, it is necessary to have following classes and property files in your project.
- JNDI Properties
- JNDIContext.java
- AllStockQuotesPublisher.java
jndi.properties
java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory
#java.naming.factory.initial =org.apache.qpid.jndi.PropertiesFileInitialContextFactory
#use the following property to configure the default connector
#connectionfactory.ConnectionFactory=amqp://admin:admin@clientid/carbon?brokerlist='tcp://localhost:5673'
# use the following property to specify the JNDI name the connection factory
# should appear as.
#connectionFactoryNames = connectionFactory, queueConnectionFactory, topicConnectionFactory
import javax.jms.TopicConnectionFactory;
import javax.naming.InitialContext;
import javax.naming.NamingException;
public class JNDIContext {
private InitialContext initContext = null;
private TopicConnectionFactory topicConnectionFactory = null;
public static JNDIContext instance = null;
private JNDIContext() {
createInitialContext();
createConnectionFactory();
}
public InitialContext getInitContext() {
return initContext;
}
public TopicConnectionFactory getTopicConnectionFactory() {
return topicConnectionFactory;
}
public static JNDIContext getInstance() {
if (instance == null) {
instance = new JNDIContext();
}
return instance;
}
/**
* Create Connection factory with initial context
*/
private void createConnectionFactory() {
// create connection factory
try {
topicConnectionFactory = (TopicConnectionFactory) initContext.lookup("ConnectionFactory");
} catch (NamingException e) {
System.out.println("Can not create topic connection factory." + e);
}
}
/**
* Create Initial Context with given configuration
*/
private void createInitialContext() {
try {
initContext = new InitialContext();
} catch (NamingException e) {
System.out.println("Can not create initial context with given parameters." + e);
}
}
}
Java class to publish stock quote information to CEP server.
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import javax.xml.stream.XMLStreamException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
/**
* Publish message to the topic created when defining Input
*/
public class AllStockQuotesPublisher {
private static InitialContext initContext = null;
private static TopicConnectionFactory topicConnectionFactory = null;
public static void main(String[] args)
throws XMLStreamException, InterruptedException, NamingException {
Properties initialContextProperties = new Properties();
initialContextProperties.put("java.naming.factory.initial", "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
initContext = new InitialContext(initialContextProperties);
topicConnectionFactory = JNDIContext.getInstance().getTopicConnectionFactory();
AllStockQuotesPublisher publisher = new AllStockQuotesPublisher();
Map<String, Object> map1 = new HashMap<String, Object>();
map1.put("symbol", "MSFT");
map1.put("price", 26.36);
publisher.publish("AllStockQuotes", map1);
System.out.println("AllStockQuotes Message 1 sent");
Thread.sleep(2000);
Map<String, Object> map2 = new HashMap<String, Object>();
map2.put("symbol", "MSFT");
map2.put("price", 36.36);
publisher.publish("AllStockQuotes", map2);
System.out.println("AllStockQuotes Message 2 sent");
Thread.sleep(2000);
Map<String, Object> map3 = new HashMap<String, Object>();
map3.put("symbol", "MSFT");
map3.put("price", 6.36);
publisher.publish("AllStockQuotes", map3);
System.out.println("AllStockQuotes Message 3 sent");
}
/**
* Publish message to given topic
*
* @param topicName - topic name to publish messages
* @param message - message to send
*/
public void publish(String topicName, Map<String, Object> message) {
// create topic connection
TopicConnection topicConnection = null;
try {
topicConnection = topicConnectionFactory.createTopicConnection();
topicConnection.start();
} catch (JMSException e) {
System.out.println("Can not create topic connection." + e);
return;
}
Session session = null;
try {
session = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(topicName);
MessageProducer producer = session.createProducer(topic);
MapMessage jmsMessage = session.createMapMessage();
for (Map.Entry<String, Object> entry : message.entrySet()) {
if (entry.getValue() instanceof Double) {
jmsMessage.setDouble(entry.getKey(), (Double) entry.getValue());
}
else if (entry.getValue() instanceof Integer) {
jmsMessage.setInt(entry.getKey(), (Integer) entry.getValue());
}
else if (entry.getValue() instanceof Long) {
jmsMessage.setLong(entry.getKey(), (Long) entry.getValue());
}
else if (entry.getValue() instanceof Float) {
jmsMessage.setFloat(entry.getKey(), (Float) entry.getValue());
}
else if (entry.getValue() instanceof String) {
jmsMessage.setString(entry.getKey(), (String) entry.getValue());
}
}
producer.send(jmsMessage);
producer.close();
session.close();
topicConnection.stop();
topicConnection.close();
} catch (JMSException e) {
System.out.println("Can not subscribe." + e);
}
}
}
Create a project with above classes and files and run the publisher class to publish events to the topic.
Creating JMS Subscriber
JMS Subscriber will receive events published to the output topic of the query. In order to create a JMS Subscriber , you need to have above jndi.properties file and JNDIContext.java class and Subscriber.java class, AllStockStatsSubscriber.java
Class to subscribe to output by CEP server.
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Enumeration;
/**
* Subscribe to myTopic and wait 10seconds to receive messages
*/
public class Subscriber implements MessageListener {
private static InitialContext initContext = null;
private static TopicConnectionFactory topicConnectionFactory = null;
private boolean messageReceived = false;
// static String TOPIC = "ConditionSatisfyingStockQuotes";
// public static void main(String[] args) throws XMLStreamException {
// new Subscriber().subscribe(TOPIC);
// }
public void subscribe(String topicName) throws NamingException {
initContext = JNDIContext.getInstance().getInitContext();
topicConnectionFactory = JNDIContext.getInstance().getTopicConnectionFactory();
// create connection
TopicConnection topicConnection = null;
try {
topicConnection = topicConnectionFactory.createTopicConnection();
} catch (JMSException e) {
System.out.println("Can not create topic connection." + e);
return;
}
// create session, subscriber, message listener and listen on that topic
TopicSession session = null;
try {
session = topicConnection.createTopicSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
// change topic name to brokerConfiguration name + topic name
Topic topic = session.createTopic(topicName);
TopicSubscriber subscriber = session.createSubscriber(topic);
subscriber.setMessageListener(this);
topicConnection.start();
synchronized (this) {
while (!messageReceived) {
try {
this.wait();
} catch (InterruptedException e) {
}
}
}
} catch (JMSException e) {
System.out.println("Can not subscribe." + e);
}
}
public void onMessage(Message message) {
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("output = " + textMessage.getText());
synchronized (this) {
messageReceived = true;
}
} catch (JMSException e) {
System.out.println("error at getting text out of received message. = " + e);
}
} else if (message instanceof MapMessage) {
try {
Enumeration enumeration = ((MapMessage) message).getMapNames();
for (; enumeration.hasMoreElements(); ) {
System.out.println(((MapMessage) message).getString((String) enumeration.nextElement()));
}
System.out.println();
} catch (JMSException e) {
System.out.println("error at getting element out of received map message. = " + e);
}
} else {
System.out.println("Received message is not a text/map message.");
}
}
}
import javax.naming.NamingException;
public class PredictedStockQuotesSubscriber {
public static void main(String[] args) throws NamingException {
new Subscriber().subscribe("AllStockStats");
}
}