com.atlassian.confluence.content.render.xhtml.migration.exceptions.UnknownMacroMigrationException: The macro 'next_previous_links' is unknown.

Kafka Event Publisher

Kafka event publisher publishes Siddhi events as xml,text, and JSON messages to a user-defined Kafka topic on a user-defined Kafka broker. This feature is donated by Andres Gomez Ferrer. For more information on Apache Kafka, see Apache Kafka documentation.

Prerequisites

Set up the below prerequisites to start configuring an Apache Kafka event publisher.

  1. Download Apache Kafka server
  2. Copy the following client JAR files from <KAFKA_HOME>/lib/ directory  to <PRODUCT_HOME>/repository/components/lib/ directory.
    • kafka_2.10-0.8.1.jar
    • zkclient-0.3.jar
    • scala-library-2.10.1.jar
    • zookeeper-3.3.4.jar
    • kafka-clients-0.8.2.1
    • metrics-core-2.2.0

Kafka_2.10-0.9.0.1 is backward compatible. Therefore, you can use Kafka_2.10-0.8.2.1 client jars to connect with Kafka_2.10-0.9.0.1.

Creating a Kafka event publisher

For instructions on creating a Kafka event publisher, see Creating Alerts.

Configuring global properties

The following global properties can be set for the Kafka event publisher type in the <DAS_HOME>/repository/conf/input-event-adapters.xml file. These properties apply to all the publishers of the kafka type. If a global property available by default is removed, the default value of the property is considered.

Custom properties cannot be added as global properties.

Property KeyDescriptionData TypeDefault Value
minThreadThe minimum number of threads (including idle threads) that should be available in the thread pool at a given time.Integer8
maxThreadThe maximum number of threads (including idle threads) that should be available in the thread pool at a given time.Integer100
keepAliveTimeInMillisThe maximum number of milliseconds that idle threads should be kept alive when the total number of threads in the pool exceeds the number of cores in the machine.Integer20000
jobQueueSizeThe maximum number of milliseconds that idle threads should be kept alive when the total number of threads in the pool exceeds the number of cores in the machine.Integer10000

Configuring adapter properties

Specify the Static and Dynamic Adapter Properties, when creating a Kafka JMS event publisher using the management console as shown below.

Apache Kafka publisher

After entering the above adapter properties, select  the Message Format which you want to apply on the published events . Also, click  Advanced to define custom output mappings based on the  Message Format you selected. For more information on custom output mapping types, see  Output Mapping Types .

You can also d efine the respective adapter properties of the event publisher based on the transport type within the  <to>  element of the event publisher configuration in the <PRODUCT_HOME>/repository/deployment/server/eventpublishers/ directory as follows.

<eventPublisher name="KafkaOutputEventAdapter" statistics="disable" trace="disable" xmlns="http://wso2.org/carbon/eventpublisher">
  ....................
  <to eventAdapterType="kafka">
    <property name="topic">test_topic</property>
    <property name="optional.configuration">{property_name1:property_value1, property_name2:property_value2}</property>
    <property name="meta.broker.list">{host1:port1,host2:port2}</property>
  </to>
</eventPublisher>

The above adapter properties are described below.

Static adapter properties

Adapter Property
Description
Configuration file propertyExample

Meta Broker List

This is for bootstrapping and the producer will only use it for getting metadata. The list can be a subset of brokers.

meta.broker.list
{host1:port1,host2:port2}

Optional Configuration Properties

Define optional configuration properties

optional.configuration

{property_name1:property_value1, property_name2:property_value2}

Dynamic adapter properties

Adapter Property
Possible Values
Description
Configuration file propertyExample

Topic

sensorStream

Name of the Kafka topic to which, input messages are published

topic
test_topic

Related samples

For more information on kafka event publisher type, see the following sample in WSO2 CEP Documentation.

com.atlassian.confluence.content.render.xhtml.migration.exceptions.UnknownMacroMigrationException: The macro 'next_previous_links2' is unknown.