This documentation is for WSO2 CEP 3.1.0. View the home page of the latest release.

Unknown macro: {next_previous_links}
Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

Introduction

This sample demonstrates how to process the sensor statistics. Here we have used kafka server as both input and output 

The query used in this sample is as follows:

from sensorInput[sensorId == "ID1"]
select sensorId, sensorVersion, sensorValue
insert into filteredSensorStream;

Prerequisites

  1. See Prerequisites in CEP Samples Setup page.
  2. Download the kafka server (tested in kafka_2.10-0.8.1) from http://kafka.apache.org/downloads.html
  3. Start the Zookeeper server with command bin/zookeeper-server-start.sh config/zookeeper.properties , the  you can see below logs (see http://kafka.apache.org/documentation.html for more information).
  4. Then start kafka server with command bin/kafka-server-start.sh config/server.properties , then you can see below logs.
  5. Then copy necessary client jars to server. Copy below jars to specific locations.
    Copy kafka_2.10-0.8.1.jar, zkclient-0.3.jar and zookeeper-3.3.4.jar from <Kafka_Server>/lib/  to <CEP_HOME>/repository/components/lib and <CEP_HOME>/samples/lib.

Building the sample

Start the WSO2 CEP server with the sample configuration numbered 0110. For instructions, see Starting sample CEP configurations. This sample configuration does the following:

  • Creates <CEP_HOME>/repository/conf/stream-manager-config.xml file, which is used to create the stream definitions for the sample.

Executing the sample

  1. Open a terminal, go to kafka server home directory and create necessary topics.

    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic sensorStream
    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic filteredSensorStream 

  2. Open another terminal, go to kafka server home directory and run below command to consume message which coming from CEP (kafka output adaptor).

    bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic filteredSensorStream --from-beginning

  3. Open another terminal, go to <CEP_HOME>/samples/producers/sensor-stats and run ant command. It send some xml events to CEP server to kafka input event adaptor.

  4. From the terminal opened in step 2, you can the processed events which emitted by CEP.

    For example, given below is part of the console output of the consumer when sending events from the producer.

Refer Kafka documentation for more information - http://kafka.apache.org/documentation.html

  • No labels