Prerequisites
Set up the below prerequisites to start configuring an Apache Kafka event publisher.
- Download Apache Kafka server.
- Configure WSO2 CEP by adding relevant jars to support Kafka transport.
- Start the Apache Kafka server. For more information, see Apache Kafka documentation.
Creating a Kafka event publisher
For instructions on creating a Kafka event publisher, see Publishing Events .
Configuring global properties
The following global properties can be set for the Kafka event publisher type in the <CEP_HOME>/repository/conf/input-event-adapters.xml
file. These properties apply to all the publishers of the kafka
type. If a global property available by default is removed, the default value of the property is considered.
Custom properties cannot be added as global properties.
Property Key | Description | Data Type | Default Value |
---|---|---|---|
minThread | The minimum number of threads (including idle threads) that should be available in the thread pool at a given time. | Integer | 8 |
maxThread | The maximum number of threads (including idle threads) that should be available in the thread pool at a given time. | Integer | 100 |
keepAliveTimeInMillis | The maximum number of milliseconds that idle threads should be kept alive when the total number of threads in the pool exceeds the number of cores in the machine. | Integer | 20000 |
jobQueueSize | The maximum number of milliseconds that idle threads should be kept alive when the total number of threads in the pool exceeds the number of cores in the machine. | Integer | 10000
|
Configuring adapter properties
Specify the Static and Dynamic Adapter Properties, when creating a Kafka JMS event publisher using the management console as shown below.
After entering the above adapter properties, select the Message Format which you want to apply on the published events . Also, click Advanced to define custom output mappings based on the Message Format you selected. For more information on custom output mapping types, see Publishing Events in Various Event Formats .
You can also d efine the respective adapter properties of the event publisher based on the transport type within the <to>
element of the event publisher configuration in the <PRODUCT_HOME>/repository/deployment/server/eventpublishers/
directory as follows.
<eventPublisher name="KafkaOutputEventAdapter" statistics="disable" trace="disable" xmlns="http://wso2.org/carbon/eventpublisher"> .................... <to eventAdapterType="kafka"> <property name="topic">test_topic</property> <property name="optional.configuration">{property_name1:property_value1, property_name2:property_value2}</property> <property name="meta.broker.list">{host1:port1,host2:port2}</property> </to> </eventPublisher>
The above adapter properties are described below.
Static adapter properties
Adapter Property | Description | Configuration file property | Example |
---|---|---|---|
Meta Broker List | This is for bootstrapping and the producer will only use it for getting metadata. The list can be a subset of brokers. | meta.broker.list | {host1:port1,host2:port2} |
Optional Configuration Properties | Define optional configuration properties | optional.configuration | {property_name1:property_value1, property_name2:property_value2} |
Dynamic adapter properties
Adapter Property | Possible Values | Description | Configuration file property | Example |
---|---|---|---|---|
Topic | sensorStream | Name of the Kafka topic to which, input messages are published | topic | test_topic |