com.atlassian.confluence.content.render.xhtml.migration.exceptions.UnknownMacroMigrationException: The macro 'next_previous_link3' is unknown.

Kafka Inbound Protocol

Kafka is a distributed, partitioned, replicated commit log service. It provides the functionality of a messaging system. Kafka maintains feeds of messages in topics. Producers write data to topics and consumers read from topics. For more information on Apache Kafka, go to Apache Kafka documentation.

WSO2 ESB kafka inbound endpoint acts as a message consumer. It creates a connection to ZooKeeper and requests messages for a topic, topics or topic filters.

In order to use the kafka inbound endpoint, you need to download and install Apache Kafka. The recommended version is kafka_2.9.2-0.8.1.1.

To configure the kafka inbound endpoint, copy the following client libraries from the <KAFKA_HOME>/libs directory to the  <ESB_HOME>/repository/components/lib directory.

  • kafka_2.9.2-0.8.1.1.jar
  • scala-library-2.9.2.jar

  • zkclient-0.3.jar

  • zookeeper-3.3.4.jar

  • metrics-core-2.2.0.jar

Note

  • If you are using kafka_2.x.x-0.8.2.0 or later, you also need to add the kafka-clients-0.8.x.x.jar file to the <ESB_HOME>/repository/components/lib directory.
  • If you are using a newer version of ZooKeeper, add the jaas.conf file to the <ESB_HOME>/repository/conf/identity directory. This is required because Kerberos authentication is enforced on newer versions of ZooKeeper.

Configuration parameters for a kafka inbound endpoint are XML fragments that represent various properties.

Following is a sample high level kafka configuration that can be used to consume messages using the specified topic or topics:

<inboundEndpoint xmlns="http://ws.apache.org/ns/synapse"
                 name="KakfaListenerEP"
                 sequence="requestHandlerSeq"
                 onError="inFaulte"
                 protocol="kafka"
                 suspend="false">
   <parameters>
      <parameter name="interval">100</parameter> 
	  <parameter name="coordination">true</parameter>
      <parameter name="sequential">true</parameter>
      <parameter name="zookeeper.connect">localhost:2181</parameter>
	  <parameter name="consumer.type">highlevel</parameter>
      <parameter name="content.type">application/xml</parameter>
      <parameter name="topics">test,sampletest</parameter>
      <parameter name="group.id">test-group</parameter>
   </parameters>
</inboundEndpoint>

Kafka inbound endpoint parameters for a high level configuration

Parameter


Description


Required


Possible ValuesDefault Value
zookeeper.connect
The host and port of a ZooKeeper server (hostname:port).Yeslocalhost:2181
consumer.type
The consumer configuration type. This can either be simple or highlevel.Yeshighlevel, simple
interval
The polling interval for the inbound endpoint to poll the messages.Yes

coordination
If set to true in a cluster setup, this will run the inbound only in a single worker node.Yestrue, falsetrue
sequential
The behaviour when executing the given sequence.Yestrue, falsetrue
topics
The category to feed the messages. A high level kafka configuration can have more than one topic. You can specify multiple topic names as comma separated values.Yes

content.type
The content of the message.Yesappllication/xml, application/json
group.id

If all the consumer instances have the same consumer group, this works as a traditional queue balancing the load over the consumers.

If all the consumer instances have different consumer groups, this works as publish-subscribe and all messages are broadcast to all consumers.

Yes

thread.count
The number of threads.No
1
consumer.idThe id of the consumer.No
null
socket.timeout.msThe socket timeout for network requests.No
30 * 1000
socket.receive.buffer.bytesThe socket receive buffer for network requests.No
64 * 1024
fetch.message.max.bytesThe number of byes of messages to attempt to fetch for each topic-partition in each fetch request.No
1024 * 1024
num.consumer.fetchersThe number fetcher threads used to fetch data.No
1
auto.commit.enableThe committed offset to be used as the position from which the new consumer will begin when the process fails.Notrue, falsetrue
auto.commit.interval.msThe frequency in ms that the consumer offsets are committed to zookeeper.No
60 * 1000
queued.max.message.chunksThe maximum number of message chunks buffered for consumption. Each chunk can go up to the value specified in fetch.message.max.bytes.No
2
rebalance.max.retriesThe maximum number of retry attempts.No
4
fetch.min.bytesThe minimum amount of data the server should return for a fetch request.No
1
fetch.wait.max.msThe maximum amount of time the server will block before responding to the fetch request when sufficient data is not available to immediately serve fetch.min.bytes.No
100
rebalance.backoff.msThe backoff time between retries during rebalance.No
2000
refresh.leader.backoff.msThe backoff time to wait before trying to determine the leader of a partition that has just lost its leader.No
200
auto.offset.reset

Set this to one of the following values based on what you need to do when there is no initial offset in ZooKeeper or if an offset is out of range.

smallest - Automatically reset the offset to the smallest offset.
largest - Automatically reset the offset to the largest offset.
anything else - Throw an exception to the consumer.

Nosmallest, largest, anything elselargest
consumer.timeout.msThe timeout interval after which a timeout exception is to be thrown to the consumer if no message is available for consumption. It is a good practice to set this value lower than the interval of the Kafka inbound endpoint.No
3000
exclude.internal.topicsSet to true if messages from internal topics such as offsets should be exposed to the consumer.Notrue, falsetrue
partition.assignment.strategyThe partitions assignment strategy to be used when assigning partitions to consumer streams.Norange, roundrobinrange
client.idThe user specified string sent in each request to help trace calls.No
value of group  id

zookeeper.session.timeout.ms

The ZooKeeper session timeout value in milliseconds.No
6000
zookeeper.connection.timeout.msThe maximum time in milliseconds that the client should wait while establishing a connection to ZooKeeper.No
6000

zookeeper.sync.time.ms

The time difference in milliseconds that a ZooKeeper follower can be behind a ZooKeeper leader.No
2000
offsets.storageThe offsets storage location.Nozookeeper, kafkazookeeper
offsets.channel.backoff.msThe backoff period in milliseconds when reconnecting the offsets channel or retrying failed offset fetch/commit requests.No
1000
offsets.channel.socket.timeout.msThe socket timeout in milliseconds when reading responses for offset fetch/commit requests.No
10000
offsets.commit.max.retriesThe maximum retry attempts allowed. If a consumer metadata request fails for any reason, retry takes place but does not have an impact on this limit.No
5
dual.commit.enabledIf offsets.storage is set to kafka, the commit offsets can be dual to ZooKeeper. Set this to true if you need to perform migration from zookeeper-based offset storage to kafka-based offset storage.Notrue, falsetrue

Following is a sample high level kafka configuration that can be used to consume messages using the topics filter,  which can either be a white list or a black list:

<inboundEndpoint xmlns="http://ws.apache.org/ns/synapse"
                 name="KakfaListenerEP"
                 sequence="requestHandlerSeq"
                 onError="inFaulte"
                 protocol="kafka"
                 suspend="false">
   <parameters>
      <parameter name="interval">100</parameter> 
	  <parameter name="coordination">true</parameter>
      <parameter name="sequential">true</parameter>
      <parameter name="zookeeper.connect">localhost:2181</parameter>
	  <parameter name="consumer.type">highlevel</parameter>
      <parameter name="content.type">application/xml</parameter>
      <parameter name="topic.filter">test</parameter>
      <parameter name="filter.from.whitelist">true</parameter>
      <parameter name="group.id">test-group</parameter>      
   </parameters>
</inboundEndpoint>

In the above configuration, you will see that the following parameters are set to consume topic filters:

Note

In high level kafka configurations, the follwing parameters are used instead of the topics paramater.

<parameter name="topic.filter">test</parameter>
<parameter name="filter.from.whitelist">true</parameter>


Following are descriptions of the parameters set to consume topic filters in a kafka configuration:

Parameter


Description


Required


topic.filter
The name of the topic filter.Yes
filter.from.whitelist
If this is set to true, messages are consumed from the whitelist(include).
If this is set to false, messages are consumed from the blacklist(exclude).
Yes

Following is a sample low level kafka configuration that can be used to consume messages from a specific server in a specific partition, so that the messages are limited:

<inboundEndpoint xmlns="http://ws.apache.org/ns/synapse"
                 name="KakfaListenerEP"
                 sequence="requestHandlerSeq"
                 onError="inFaulte"
                 protocol="kafka"
                 interval="1000"
                 suspend="false">
   <parameters>     
      <parameter name="zookeeper.connect">localhost:2181</parameter>
      <parameter name="group.id">test-group</parameter>  
      <parameter name="content.type">application/xml</parameter>
      <parameter name="consumer.type">simple</parameter>
      <parameter name="simple.max.messages.to.read">5</parameter>
      <parameter name="simple.topic">test</parameter>
      <parameter name="simple.brokers">localhost</parameter>
      <parameter name="simple.port">9092</parameter>
      <parameter name="simple.partition">1</parameter>
      <parameter name="interval">1000</parameter>
   </parameters>
</inboundEndpoint>

Kafka inbound endpoint parameters for a low level configuration

Parameter                                                    


Description                                                                               


Required                                                              


simple.topic
The category to feed the messages.Yes
simple.brokers
The specific Kafka broker name.Yes
simple.port
The specific Kafka server port number.Yes
simple.partition
The partition of the topic.Yes
simple.max.messages.to.read

The maximum number of messages to retrieve.

Yes

Samples

For a sample that demonstrates how one way message bridging from Kafka to HTTP can be done using the kafka inbound endpoint, see Sample 904: Kafka Inbound Endpoint Sample.

com.atlassian.confluence.content.render.xhtml.migration.exceptions.UnknownMacroMigrationException: The macro 'next_previous_links2' is unknown.