The Apache Kafka event receiver reads the tail of a given file and inputs that to the WSO2 product engine. This feature is donated by Andres Gomez Ferrer. For more information, go to Apache Kafka documentation.
Prerequisites
Set up the below prerequisites to start configuring an Apache Kafka event receiver.
- Download Apache Kafka server.
- Copy the following client JAR files from
<KAFKA_HOME>/lib/
directory to<PRODUCT_HOME>/repository/components/lib/
directory.kafka_2.10-0.8.1.jar
zkclient-0.3.jar
scala-library-2.10.1.jar
zookeeper-3.3.4.jar
kafka-clients-0.8.2.1
metrics-core-2.2.0
Kafka_2.10-0.9.0.1 is backward compatible. Therefore, you can use Kafka_2.10-0.8.2.1 client jars to connect with Kafka_2.10-0.9.0.1.
If you are using Kafka_2.11-0.10.0.0, you need to download this jar and save it in the
<PRODUCT_HOME>/repository/components/lib/
directory. - Start the Apache Kafka server.
Creating an Kafka event receiver
For instructions on creating an Apache Kafka event receiver, see Configuring Event Receivers.
Configuring adapter properties
Specify the Adapter Properties , when creating an Apache Kafka event receiver using the management console as shown below.
After entering the above adapter properties, select the Event Stream to which you want to map the incoming events, and the Message Format which you want to apply on the receiving events. Also, click Advanced to define custom input mappings based on the Message Format you selected. For more information on custom input mapping types, see Input Mapping Types .
You can also d efine the respective adapter properties of the event receiver based on the transport type within the <from>
element of the event receiver configuration in the <PRODUCT_HOME>/repository/deployment/server/eventreceivers/
directory as follows.
<eventReceiver name="KafkaInputEventAdpater" statistics="disable" trace="disable" xmlns="http://wso2.org/carbon/eventreceiver"> <from eventAdapterType="kafka"> <property name="topic">test_topic</property> <property name="zookeeper.connect">127.0.0.1</property> <property name="threads">4</property> <property name="optional.configuration">zk.sessiontimeout.ms:6000</property> <property name="group.id">groupid</property> <property name="events.duplicated.in.cluster">false</property> </from> ..................... </eventReceiver>
The above adapter properties are described below.
Adapter Property | Description | Configuration file property | Example |
---|---|---|---|
Server Zookeeper IP | IP address of the Zookeeper Server | zookeeper.connect | 127.0.0.1 |
Group ID Kafka | Kafka consumer group id which uniquely identifies a set of consumers within the same consumer group | group.id | groupid |
Threads | Number of consumer threads | threads | 4 |
Optional Configuration Properties | Valid property and value pairs to denote o ptional configuration properties for Apache Kafka. (E.g. "property1: value1, property2: value2") For more information on Axis2 JMS properties, go to Apache Kafka Documentation. | optional.configuration | zk.sessiontimeout.ms:6000 |
Topic Kafka | Name of the Kafka topic to which, input messages are published | topic | test_topic |
Is events duplicated in cluster | In a cluster whether the same event can reach two receiver nodes | events.duplicated.in.cluster | true/false |
Related samples
For more information on kafka
event receiver type, see the following sample in WSO2 CEP Documentation.