Introduction
This sample demonstrates how to process the sensor statistics. Here we have used kafka server as both input and output
The query used in this sample is as follows:
from sensorInput[sensorId == "ID1"] select sensorId, sensorVersion, sensorValue insert into filteredSensorStream;
Prerequisites
- See Prerequisites in CEP Samples Setup page.
- Download the kafka server (tested in kafka_2.10-0.8.1) from http://kafka.apache.org/downloads.html.
- Start the Zookeeper server with command bin/zookeeper-server-start.sh config/zookeeper.properties , the you can see below logs (see http://kafka.apache.org/documentation.html for more information).
- Then start kafka server with command bin/kafka-server-start.sh config/server.properties , then you can see below logs.
- Then copy necessary client jars to server. Copy below jars to specific locations.
Copy kafka_2.10-0.8.1.jar, zkclient-0.3.jar and zookeeper-3.3.4.jar from <Kafka_Server>/lib/ to <CEP_HOME>/repository/components/lib and <CEP_HOME>/samples/lib.
Building the sample
Start the WSO2 CEP server with the sample configuration numbered 0110. For instructions, see Starting sample CEP configurations. This sample configuration does the following:
- Creates
<CEP_HOME>/repository/conf/stream-manager-config.xml
file, which is used to create the stream definitions for the sample.
Executing the sample
Open a terminal, go to kafka server home directory and create necessary topics.
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic sensorStream
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic filteredSensorStreamOpen another terminal, go to kafka server home directory and run below command to consume message which coming from CEP (kafka output adaptor).
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic filteredSensorStream --from-beginning
Open another terminal, go to
<CEP_HOME>/samples/producers/sensor-stats
and run ant command. It send some xml events to CEP server to kafka input event adaptor.From the terminal opened in step 2, you can the processed events which emitted by CEP.
For example, given below is part of the console output of the consumer when sending events from the producer.
Refer Kafka documentation for more information - http://kafka.apache.org/documentation.html