Kafka is a distributed, partitioned, replicated commit log service. It provides the functionality of a messaging system. Kafka maintains feeds of messages in topics. Producers write data to topics and consumers read from topics. For more information on Apache Kafka, go to Apache Kafka documentation.
The Kafka inbound endpoint acts as a message consumer. It creates a connection to zookeeper and requests messages for a topic, topics or topic filters.
In order to use the kafka inbound endpoint, you need to download and install Apache Kafka. The recommend version is kafka_2.9.2-0.8.1.1 or later.
To configure the kafka inbound endpoint, copy the following client libraries from the <KAFKA_HOME>/lib
directory to the <EI_HOME>/lib
directory.
kafka_2.9.2-0.8.1.1.jar
- scala-library-2.9.2.jar
- zkclient-0.3.jar
- zookeeper-3.3.4.jar
- metrics-core-2.2.0.jar
Note
If you use kafka_2.x.x-0.8.2.0 or later, you need to add kafka-clients-0.8.x.x.jar
also to the <EI_HOME>/lib
directory.
Configuration parameters for a kafka inbound endpoint are XML fragments that represent various properties.
Following is a sample high level kafka configuration that can be used to consume messages using the specified topic or topics:
<inboundEndpoint xmlns="http://ws.apache.org/ns/synapse" name="KakfaListenerEP" sequence="requestHandlerSeq" onError="inFaulte" protocol="kafka" suspend="false"> <parameters> <parameter name="interval">100</parameter> <parameter name="coordination">true</parameter> <parameter name="sequential">true</parameter> <parameter name="zookeeper.connect">localhost:2181</parameter> <parameter name="consumer.type">highlevel</parameter> <parameter name="content.type">application/xml</parameter> <parameter name="topics">test,sampletest</parameter> <parameter name="group.id">test-group</parameter> </parameters> </inboundEndpoint>
Kafka inbound endpoint parameters for a high level configuration
Parameter | Description | Required | Possible Values | Default Value |
---|---|---|---|---|
zookeeper.connect | The host and port of a ZooKeeper server (hostname:port ). | Yes | localhost:2181 | |
consumer.type | The consumer configuration type. This can either be simple or highlevel . | Yes | highlevel, simple | |
interval | The polling interval for the inbound endpoint to poll the messages. | Yes | ||
coordination | If set to true in a cluster setup, this will run the inbound only in a single worker node. | Yes | true, false | true |
sequential | The behaviour when executing the given sequence. | Yes | true, false | true |
topics | The category to feed the messages. A high level kafka configuration can have more than one topic. You can specify multiple topic names as comma separated values. | Yes | ||
content.type | The content of the message. | Yes | appllication/xml, application/json | |
group.id | If all the consumer instances have the same consumer group, this works as a traditional queue balancing the load over the consumers. If all the consumer instances have different consumer groups, this works as publish-subscribe and all messages are broadcast to all consumers. | Yes | ||
thread.count | The number of threads. | No | 1 | |
consumer.id | The id of the consumer. | No | null | |
socket.timeout.ms | The socket timeout for network requests. | No | 30 * 1000 | |
socket.receive.buffer.bytes | The socket receive buffer for network requests. | No | 64 * 1024 | |
fetch.message.max.bytes | The number of byes of messages to attempt to fetch for each topic-partition in each fetch request. | No | 1024 * 1024 | |
num.consumer.fetchers | The number fetcher threads used to fetch data. | No | 1 | |
auto.commit.enable | The committed offset to be used as the position from which the new consumer will begin when the process fails. | No | true, false | true |
auto.commit.interval.ms | The frequency in ms that the consumer offsets are committed to zookeeper. | No | 60 * 1000 | |
queued.max.message.chunks | The maximum number of message chunks buffered for consumption. Each chunk can go up to the value specified in fetch.message.max.bytes . | No | 2 | |
rebalance.max.retries | The maximum number of retry attempts. | No | 4 | |
fetch.min.bytes | The minimum amount of data the server should return for a fetch request. | No | 1 | |
fetch.wait.max.ms | The maximum amount of time the server will block before responding to the fetch request when sufficient data is not available to immediately serve fetch.min.bytes . | No | 100 | |
rebalance.backoff.ms | The backoff time between retries during rebalance. | No | 2000 | |
refresh.leader.backoff.ms | The backoff time to wait before trying to determine the leader of a partition that has just lost its leader. | No | 200 | |
auto.offset.reset | Set this to one of the following values based on what you need to do when there is no initial offset in ZooKeeper or if an offset is out of range. smallest - Automatically reset the offset to the smallest offset. | No | smallest, largest, anything else | largest |
consumer.timeout.ms | The timeout interval after which a timeout exception is to be thrown to the consumer if no message is available for consumption. It is a good practice to set this value lower than the interval of the Kafka inbound endpoint. | No | 3000 | |
exclude.internal.topics | Set to true if messages from internal topics such as offsets should be exposed to the consumer. | No | true, false | true |
partition.assignment.strategy | The partitions assignment strategy to be used when assigning partitions to consumer streams. | No | range, roundrobin | range |
client.id | The user specified string sent in each request to help trace calls. | No | value of group id | |
| The ZooKeeper session timeout value in milliseconds. | No | 6000 | |
zookeeper.connection.timeout.ms | The maximum time in milliseconds that the client should wait while establishing a connection to ZooKeeper. | No | 6000 | |
| The time difference in milliseconds that a ZooKeeper follower can be behind a ZooKeeper leader. | No | 2000 | |
offsets.storage | The offsets storage location. | No | zookeeper, kafka | zookeeper |
offsets.channel.backoff.ms | The backoff period in milliseconds when reconnecting the offsets channel or retrying failed offset fetch/commit requests. | No | 1000 | |
offsets.channel.socket.timeout.ms | The socket timeout in milliseconds when reading responses for offset fetch/commit requests. | No | 10000 | |
offsets.commit.max.retries | The maximum retry attempts allowed. If a consumer metadata request fails for any reason, retry takes place but does not have an impact on this limit. | No | 5 | |
dual.commit.enabled | If offsets.storage is set to kafka , the commit offsets can be dual to ZooKeeper. Set this to true if you need to perform migration from zookeeper-based offset storage to kafka-based offset storage. | No | true, false | true |
Following is a sample high level kafka configuration that can be used to consume messages using the topics filter, which can either be a white list or a black list. :
<inboundEndpoint xmlns="http://ws.apache.org/ns/synapse" name="KakfaListenerEP" sequence="requestHandlerSeq" onError="inFaulte" protocol="kafka" suspend="false"> <parameters> <parameter name="interval">100</parameter> <parameter name="coordination">true</parameter> <parameter name="sequential">true</parameter> <parameter name="zookeeper.connect">localhost:2181</parameter> <parameter name="consumer.type">highlevel</parameter> <parameter name="content.type">application/xml</parameter> <parameter name="topic.filter">test</parameter> <parameter name="filter.from.whitelist">true</parameter> <parameter name="group.id">test-group</parameter> </parameters> </inboundEndpoint>
In the above configuration, you will see that instead of using the topics
parameter, the following parameters are set for topic filter consuming:
<parameter name="topic.filter">test</parameter> <parameter name="filter.from.whitelist">true</parameter>
Parameter | Description | Required |
---|---|---|
topic.filter | The topic filter name. | Yes |
filter.from.whitelist | If set to true , messages will be consumed from the whitelist(include) whereas if set to false messages will be consumed from the blacklist(exclude). | Yes |
Following is a sample low level kafka configuration that can be used to consume the messages from a specific server and specific partition so that the messages are limited.:
<inboundEndpoint xmlns="http://ws.apache.org/ns/synapse" name="KakfaListenerEP" sequence="requestHandlerSeq" onError="inFaulte" protocol="kafka" interval="1000" suspend="false"> <parameters> <parameter name="zookeeper.connect">localhost:2181</parameter> <parameter name="group.id">test-group</parameter> <parameter name="content.type">application/xml</parameter> <parameter name="consumer.type">simple</parameter> <parameter name="simple.max.messages.to.read">5</parameter> <parameter name="simple.topic">test</parameter> <parameter name="simple.brokers">localhost</parameter> <parameter name="simple.port">9092</parameter> <parameter name="simple.partition">1</parameter> <parameter name="interval">1000</parameter> </parameters> </inboundEndpoint>
Kafka inbound endpoint parameters for a low level configuration
Parameter | Description | Required |
---|---|---|
simple.topic | The category to feed the messages. | Yes |
simple.brokers | The specific Kafka broker name. | Yes |
simple.port | The specific Kafka server port number. | Yes |
simple.partition | The partition of the topic. | Yes |
simple.max.messages.to.read | The maximum number of messages to receive. | Yes |
Samples
For a sample that demonstrates how one way message bridging from Kafka to HTTP can be done using the kafka inbound endpoint, see Sample 904: Kafka Inbound Endpoint Sample.