com.atlassian.confluence.content.render.xhtml.migration.exceptions.UnknownMacroMigrationException: The macro 'next_previous_links' is unknown.

Sample 904: Inbound Endpoint Kafka Protocol Sample

Note that WSO2 EI is shipped with the following changes to what is mentioned in this documentation:

  • <PRODUCT_HOME>/repository/samples/ directory that includes all Integration profile samples is changed to <EI_HOME>/samples/service-bus/.
  • <PRODUCT_HOME>/repository/samples/resources/ directory that includes all artifacts related to the Integration profile samples is changed to <EI_HOME>/samples/service-bus/resources/.

Introduction

This sample demonstrates how one way message bridging from Kafka to HTTP can be done using the inbound kafka endpoint.

Prerequisites

  • Download and install Apache Kafka. For more information, see Apache Kafka documentation
  • Copy the following client libraries from the <KAFKA_HOME>/lib directory to the <EI_HOME>/lib directory.

    • kafka_2.9.2-0.8.1.1.jar
    • scala-library-2.9.2.jar
    • zkclient-0.3.jar
    • zookeeper-3.3.4.jar
    • metrics-core-2.2.0.jar

    Note

    • If you are using kafka_2.x.x-0.8.2.0 or later, you also need to add the kafka-clients-0.8.x.x.jar file to the <EI_HOME>/lib  directory.
    • If you are using a newer version of ZooKeeper, follow the steps below:

        1. Create a directory named conf inside the <EI_HOME>/repository directory.
        2. Create a directory named identity inside the <EI_HOME>/repository/conf directory.
        3. Add the jaas.conf file to the <EI_HOME>/repository/conf/identity directory. This is required because Kerberos authentication is enforced on newer versions of ZooKeeper.
  • Navigate to <KAFKA_HOME> and run the following command to start the ZooKeeper server:

    bin/zookeeper-server-start.sh config/zookeeper.properties

    You will see the following log:

  • Run the following command to start the Kafka server

    bin/kafka-server-start.sh config/server.properties

    You will see the following log:

Building the sample

The XML configuration for this sample is as follows:

<definitions xmlns="http://ws.apache.org/ns/synapse">
<inboundEndpoint xmlns="http://ws.apache.org/ns/synapse"
 name="KAFKAListenerEP"
 sequence="TestIn"
 onError="fault"
 protocol="kafka"
 suspend="false">
 <parameters>
     <parameter name="interval">10</parameter>
     <parameter name="consumer.type">highlevel</parameter>
     <parameter name="content.type">application/xml</parameter>
     <parameter name="coordination">false</parameter>
     <parameter name="sequential">false</parameter>
     <parameter name="topics">test</parameter>
     <parameter name="zookeeper.connect">localhost:2181</parameter>
     <parameter name="group.id">test-1</parameter>
 </parameters>
</inboundEndpoint>
 
<sequence xmlns="http://ws.apache.org/ns/synapse" name="TestIn">
 <log level="full"/>
 <drop/>
</sequence>
 
</definitions>

This configuration file synapse_sample_904.xml is available in the <ESB_HOME>/repository/samples directory.

To build the sample

  • Start the ESB with the sample 904 configuration. For instructions on starting a sample ESB configuration, see Starting the ESB with a sample configuration.
    The operation log keeps running until the server starts, which usually takes several seconds. Wait until the server has fully booted up and displays a message similar to "WSO2 Carbon started in n seconds."

Executing the sample

  • Run the following on the Kafka command line to create a topic named test with a single partition and only one replica:

    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
  • Run the following on the Kafka command line to send a message to the Kafka brokers. You can also use the WSO2 ESB Kafka producer connector to send the message to the Kafka brokers.

    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

Analyzing the output


You will see the following Message content:

<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:wsa="http://www.w3.org/2005/08/addressing"><soapenv:Body><m0:getQuote xmlns:m0="http://services.samples">
<m0:request><m0:symbol>IBM</m0:symbol></m0:request></m0:getQuote></soapenv:Body></soapenv:Envelope>

The Kafka inbound gets the messages from the Kafka brokers and logs the messages in the ESB


com.atlassian.confluence.content.render.xhtml.migration.exceptions.UnknownMacroMigrationException: The macro 'next_previous_links2' is unknown.