Sample 0110 - Sensor Stream Processor
Introduction
This sample demonstrates how to process the sensor statistics. Here we have used kafka server as both input and outputÂ
The query used in this sample is as follows:
from sensorInput[sensorId == "ID1"] select sensorId, sensorVersion, sensorValue insert into filteredSensorStream;
Prerequisites
- See Prerequisites in CEP Samples Setup page.
Download Apache Kafka server.Â
This sample is tested using Kafka 2.10-0.8.1 version.
- Copy the following client JAR files from
<KAFKA_HOME>/lib/
directory  to<CEP_HOME>/repository/components/lib/
directory.kafka_2.10-0.8.1.jar
zkclient-0.3.jar
scala-library-2.10.1.jar
zookeeper-3.3.4.jar
- Copy all the JAR files, which are located in
<KAFKA_HOME>/lib/
directory toÂ<CEP_HOME>/samples/lib/
directory. - Start the Zookeeper server with command bin/zookeeper-server-start.sh config/zookeeper.properties , the  you can see below logs (see http://kafka.apache.org/documentation.html for more information).
- Then start kafka server with command bin/kafka-server-start.sh config/server.properties , then you can see below logs.
Building the sample
Start the WSO2 CEP server with the sample configuration numbered 0110. For instructions, see Starting sample CEP configurations. This sample configuration does the following:
- Creates
<CEP_HOME>/repository/conf/data-bridge/stream-definitions.xml
file, which is used to create the stream definitions for the sample.
Executing the sample
Open a terminal, go to kafka server home directory and create necessary topics.
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic sensorStream
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic filteredSensorStreamÂOpen another terminal, go to kafka server home directory and run below command to consume message which coming from CEP (kafka output adapter).
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic filteredSensorStream --from-beginning
Open another terminal, go to
<CEP_HOME>/samples/producers/sensor-stats
and run ant command. It send some xml events to CEP server to kafka input event adapter.From the terminal opened in step 2, you can the processed events which emitted by CEP.
For example, given below is part of the console output of the consumer when sending events from the producer.
Refer Kafka documentation for more information - http://kafka.apache.org/documentation.html