com.atlassian.confluence.content.render.xhtml.migration.exceptions.UnknownMacroMigrationException: The macro 'next_previous_link3' is unknown.

Consuming Events

Introduction

In the previous tutorial, you covered how to create a simple Siddhi application and how to simulate virtual events to it.

In this tutorial, let's look at how events are received in a real world scenario, and how to use the event capturing framework of WSO2 SP to receive incoming events even when their format is different to the format specified within your Siddhi application.

Let's consider a scenario where the events generated by the Sweet Bots are in the JSON format, and they are sent via HTTP calls. The manager requires them to be received at a specific endpoint that has a specific URL.

This tutorial covers the following topics:

  • Event capturing with Siddhi sources
  • Using maps with sources
  • Custom mappings for inbound events

Before you begin:

  • This tutorial uses the same Siddhi application that is created and deployed in Tutorial 1: Creating a Simple Siddhi Application. Therefore, it is recommended that you try Tutorial 1 first.
  • Sources are the type of extensions that are used in the WSO2 Stream Processor to indicate to the runtime that events would be arriving from the defined endpoint or entity. WSO2 Stream Processor supports many different Source types out of the box, which include HTTP, TCP, Kafka etc. For more information about the sources supported for WSO2 SP, see Collecting Events.

Tutorial steps

Let's get started!

To understand the difference between receiving events with default mapping and receiving them with custom mapping, this section is divided into the the following parts.

Configure Siddhi application to receive HTTP events

In this section, let's see how to add the required configurations to receive events via HTTP in JSON format.

  1. Let's open the SweetTotalsApp Siddhi application that you created in the previous tutorial. It currently looks as follows.

    @App:name('SweetTotalApp')
    
    define stream SweetProductionStream (name string, amount long);
    
    @sink(type='log', prefix='Sweet Totals:')
    define stream SweetTotalStream(name string, totalProduction long);
    
    @info(name='SweetTotalQuery')
    from SweetProductionStream
    select name, sum(amount) as totalProduction
    group by name
    insert into SweetTotalStream;
  2. To receive the events generated by the Sweet Bots via HTTP calls, you need an event source of the http type. Let's add it as shown below.

    @source(type='http')
  3. The HTTP events need to be received at a specific endpoint. Let's add the URL of this endpoint to the HTTP source configuration as follows:

    @source(type='http', receiver.url='http://localhost:5005/SweetProductionEP')

  4. The incoming messages generated by Sweet Bots are in the JSON format. You require a mapping configuration to transform them into a format that can be processed by WSO2 SP. Therefore, let's add an annotation for JSON mapping to the HTTP source as shown below.

    For the complete list of mapping types supported for WSO2 SP, see Collecting Events - Event Format.

    @source(type='http', receiver.url='http://localhost:5005/SweetProductionEP', @map(type = 'json'))

    Now the SweetTotalsApp Siddhi application looks as follows.

    @App:name('SweetTotalApp')
    
    @source(type='http', receiver.url='http://localhost:5005/SweetProductionEP', @map(type = 'json'))
    define stream SweetProductionStream (name string, amount long);
    
    @sink(type='log', prefix='Sweet Totals:')
    define stream SweetTotalStream(name string, totalProduction long);
    
    @info(name='SweetTotalQuery')
    from SweetProductionStream
    select name, sum(amount) as totalProduction
    group by name
    insert into SweetTotalStream;
  5. Save the SweetTotalsApp Siddhi application. Then click the following icon to start it so that it can process the events you send.
     
  6. To see whether the Siddhi application functions as expected, let's send two events with the following information.

    The input events generated by the Sweet Bots are in the JSON format. Therefore, the format is similar to the sample shown below.

    {
      "event": {
        "name": "Jaffa Cake",
        "amount": 10
      }
    }

    In order to send the two input events to WSO2 SP in the format shown above, issue the following two cURL commands.

    curl -X POST \
      http://localhost:5005/SweetProductionEP \
      -H 'content-type: application/json' \
      -d '{
      "event": {
        "name": "Jaffa Cake",
        "amount": 10
      }
    }'
    curl -X POST \
      http://localhost:5005/SweetProductionEP \
      -H 'content-type: application/json' \
      -d '{
      "event": {
        "name": "Jaffa Cake",
        "amount": 15
      }
    }'

    The two events sent contains the following information.

    Input Event NoNameAmount
    1Jaffa Cake10
    2Jaffa Cake15

    As a result, two output events are expected to be generated with the following information.

    Output Event NoNameAmount
    1Jaffa Cake10
    2Jaffa Cake25

    This generates the following output log in the CLI.

Add custom mapping

The previous step assumes that the message body of the events generated by Sweet Bots exactly match the schema of the SweetProductionStream input stream that is defined in your Siddhi application. Now let's consider a scenario where the attributes of the incoming JSON events are different to that of the input stream. Their format is as given in the sample below:

{
  "sweet": "Jaffa Cake",
  "batch": {
    "batch id": "batch1",
    "count": 10
  }
}

From this event, you only need to extract the values for sweet and count (which is nested under batch) attributes. Therefore, let's update the @map annotation as follows:

  1. To allow the Siddhi application to identify the elements that need to be extracted from the JSON message, add an @attributes annotation to the mapping configuration as follows.

    @map(type = 'json', @attributes(...))



  2. The JSON mapper supports JsonPath. Therefore, the values for the attributes annotation can be JsonPath expressions. The JsonPath expressions for the name of the sweet and its total production are as follows.

    Stream Attribute NameJSON Event Attribute NameJsonPath Expression
    namesweet$.sweet
    amountcount$.batch.count

    Based on the above JsonPath expressions you identified, you can update the @attributes annotation as follows.

    @attributes(name = '$.sweet', amount = '$.batch.count')

    Once thie above substeps are completed, the mapping configuration looks as follows.

    @map(type = 'json', @attributes(name = '$.sweet', amount = '$.batch.count')

    The complete Siddhi application looks as follows.

    @App:name('SweetTotalApp')
    
    @source(type='http', receiver.url='http://localhost:5005/SweetProductionEP', @map(type = 'json', @attributes(name = '$.sweet', amount = '$.batch.count')))
    define stream SweetProductionStream (name string, amount long);
    
    @sink(type='log', prefix='Sweet Totals:')
    define stream SweetTotalStream(name string, totalProduction long);
    
    @info(name='SweetTotalQuery')
    from SweetProductionStream
    select name, sum(amount) as totalProduction
    group by name
    insert into SweetTotalStream;
  3. Let's save the changes you made to SweetTotalsApp Siddhi application. Then click the following icon to start it so that it can process the events you send.
     
  4. Now you can issue the following cURL commands to send the two incoming events as sent by the Sweet Bots.

    curl -X POST \
      http://localhost:5005/SweetProductionEP \
      -H 'content-type: application/json' \
      -d '{
      "sweet": "Jaffa Cake",
      "batch": {
    	"batch id": "batch1",
    	"count": 10
      }
    }'
    
    
    curl -X POST \
      http://localhost:5005/SweetProductionEP \
      -H 'content-type: application/json' \
      -d '{
      "sweet": "Jaffa Cake",
      "batch": {
    	"batch id": "batch1",
    	"count": 15
      }
    }'
    
    

As a result, you should see the same output in the CLI logs and you saw when you carried out step 6.

com.atlassian.confluence.content.render.xhtml.migration.exceptions.UnknownMacroMigrationException: The macro 'next_previous_links2' is unknown.