This site contains the documentation that is relevant to older WSO2 product versions and offerings.
For the latest WSO2 documentation, visit https://wso2.com/documentation/.
Sample 904: Inbound Endpoint Kafka Protocol Sample
Introduction
This sample demonstrates how one way message bridging from Kafka to HTTP can be done using the inbound kafka endpoint.
Prerequisites
Download and install Apache Kafka. For more information, see Apache Kafka documentation.
Copy the following client libraries from the
<KAFKA_HOME>/libdirectory to the<EI_HOME>/libdirectory.kafka_2.9.2-0.8.1.1.jarscala-library-2.9.2.jarzkclient-0.3.jarzookeeper-3.3.4.jarmetrics-core-2.2.0.jar
Run the following command to start the ZooKeeper server:
bin/zookeeper-server-start.sh config/zookeeper.propertiesYou will see the following log:
Run the following command to start the Kafka server
bin/kafka-server-start.sh config/server.propertiesYou will see the following log:
Building the sample
The XML configuration for this sample is as follows:
<definitions xmlns="http://ws.apache.org/ns/synapse">
<inboundEndpoint xmlns="http://ws.apache.org/ns/synapse"
name="KAFKAListenerEP"
sequence="TestIn"
onError="fault"
protocol="kafka"
suspend="false">
<parameters>
<parameter name="interval">10</parameter>
<parameter name="consumer.type">highlevel</parameter>
<parameter name="content.type">application/xml</parameter>
<parameter name="coordination">false</parameter>
<parameter name="sequential">false</parameter>
<parameter name="topics">test</parameter>
<parameter name="zookeeper.connect">localhost:2181</parameter>
<parameter name="group.id">test-1</parameter>
</parameters>
</inboundEndpoint>
<sequence xmlns="http://ws.apache.org/ns/synapse" name="TestIn">
<log level="full"/>
<drop/>
</sequence>
</definitions>This configuration file synapse_sample_904.xml is available in the <ESB_HOME>/repository/samples directory.
To build the sample
Start the ESB with the sample 904 configuration. For instructions on starting a sample ESB configuration, see Starting the ESB with a sample configuration.
The operation log keeps running until the server starts, which usually takes several seconds. Wait until the server has fully booted up and displays a message similar to "WSO2 Carbon started in n seconds."
Executing the sample
Run the following on the Kafka command line to create a topic named
testwith a single partition and only one replica:bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testRun the following on the Kafka command line to send a message to the Kafka brokers. You can also use the WSO2 ESB Kafka producer connector to send the message to the Kafka brokers.
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
Analyzing the output
You will see the following Message content:
<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:wsa="http://www.w3.org/2005/08/addressing"><soapenv:Body><m0:getQuote xmlns:m0="http://services.samples">
<m0:request><m0:symbol>IBM</m0:symbol></m0:request></m0:getQuote></soapenv:Body></soapenv:Envelope>The Kafka inbound gets the messages from the Kafka brokers and logs the messages in the ESB