Prerequisites
Set up the below prerequisites to start configuring an Apache Kafka event publisher.
- Download Apache Kafka server.
- Configure WSO2 CEP by adding relevant jars to support Kafka transport.
- Start the Apache Kafka server. For more information, see Apache Kafka documentation.
Creating a Kafka event publisher
For instructions on creating a Kafka event publisher, see Publishing Events.
Configuring global properties
The following global properties can be set for the Kafka event publisher type in the <CEP_HOME>/repository/conf/input-event-adapters.xml
file. These properties apply to all the publishers of the kafka
type. If a global property available by default is removed, the default value of the property is considered.
Custom properties cannot be added as global properties.
Property Key | Description | Data Type | Default Value |
---|---|---|---|
minThread | The minimum number of threads (including idle threads) that should be available in the thread pool at a given time. | Integer | 8 |
maxThread | The maximum number of threads (including idle threads) that should be available in the thread pool at a given time. | Integer | 100 |
keepAliveTimeInMillis | The maximum number of milliseconds that idle threads should be kept alive when the total number of threads in the pool exceeds the number of cores in the machine. | Integer | 20000 |
jobQueueSize | The maximum number of milliseconds that idle threads should be kept alive when the total number of threads in the pool exceeds the number of cores in the machine. | Integer | 10000
|
Configuring adapter properties
Specify the Static and Dynamic Adapter Properties, when creating a Kafka JMS event publisher using the management console as shown below.
After entering the above adapter properties, select the Message Format which you want to apply on the published events. Also, click Advanced to define custom output mappings based on the Message Format you selected. For more information on custom output mapping types, see Publishing Events in Various Event Formats.
You can also define the respective adapter properties of the event publisher based on the transport type within the <to>
element of the event publisher configuration in the <PRODUCT_HOME>/repository/deployment/server/eventpublishers/
directory as follows.
<eventPublisher name="KafkaOutputEventAdapter" statistics="disable" trace="disable" xmlns="http://wso2.org/carbon/eventpublisher"> .................... <to eventAdapterType="kafka"> <property name="topic">test_topic</property> <property name="optional.configuration">{property_name1:property_value1, property_name2:property_value2}</property> <property name="meta.broker.list">{host1:port1,host2:port2}</property> </to> </eventPublisher>
The above adapter properties are described below.
Static adapter properties
Adapter Property | Description | Configuration file property | Example |
---|---|---|---|
Meta Broker List | This is for bootstrapping and the producer will only use it for getting metadata. The list can be a subset of brokers. | meta.broker.list | {host1:port1,host2:port2} |
Optional Configuration Properties | Define optional configuration properties | optional.configuration | {property_name1:property_value1, property_name2:property_value2} |
Dynamic adapter properties
Adapter Property | Possible Values | Description | Configuration file property | Example |
---|---|---|---|---|
Topic | sensorStream | Name of the Kafka topic to which, input messages are published | topic | test_topic |