com.atlassian.confluence.content.render.xhtml.migration.exceptions.UnknownMacroMigrationException: The macro 'next_previous_link3' is unknown.

Publishing Processed Events

Introduction

In the previous tutorials, you covered how to capture events with WSO2 Stream Processor and how these events are preprocessed. In all the previous tutorials, you published the output as logs in the console.

In this tutorial, let's look at how events can be published via WSO2 Stream Processor to external endpoints. Similar to receiving events, WSO2 SP also supports many functionalities for publishing events, allowing you to publish events using different transports and in different formats.

In the previous tutorial, you wrote a query to identify the hours during which the per hour production was less than 5000. Let's consider a scenario where the factory foreman needs to send an email to the factory manager when there is such a shortage in the production of a sweet.

This tutorial covers the following concepts:

  • Publishing events with Siddhi sinks

  • Custom mappings for sinks

Before you begin:

Tutorial steps

Let's get started!

  1. Start the editor, and log in to the WSO2 Stream Processor Studio. Then open the PastHourSweetProductionApp Siddhi application that you created in Tutorial 5. It already has the following queries to identify the hours during which the total production of a sweet has been less than 5000.

    define stream SweetProductionStream (name string, amount long);

    define stream LowProductionStream (name string, hourlyTotal long, currentHour long);

    from SweetProductionStream#window.time(1 hour)
    select name, sum(amount) as hourTotal, time:extract(currentTimeMillis(), 'HOUR') as currentHour group by name having hourlyTotal < 5000 and currentHour > 9 and currentHour < 17 insert into LowProductionStream;

  2. To send an email when the production of a sweet falls below 5000 during a hour, you need to configure an email sink. Let's add it to the Siddhi application as follows.

    @sink(type='email');
    define stream LowProductionStream (name string, hourlyTotal long, currentHour long)

    This sink configuration is added above the LowProductionStream output stream definition to indicate that it is connected to this stream. The events sent via this sink are taken from the LowProductionStream output stream.

  3. For the email to reach the factory manager, the sweet factory foreman needs to send it from the factory's email address which is factory012@sweets-r-us.com, to the factory manager's email address which is bossman@sweets-r-us.com. Taking this into account, let's update the email sink as follows:

    @sink(type='email', address='factory012@sweets-r-us.com', username='factory012', password='secret_password', subject='Low production alert', to='bossman@sweets-r-us.com')
    define stream LowProductionStream (name string, hourlyTotal long, currentHour long);

    This configuration includes the following information.

    • username: The email username of the sender. (Note: this should be without the @ and the latter part of the address. eg: @gmail.com)

    • address: The email address of the sender.

    • password: The email password of the sender.

    • subject: The email password.

    • to: The recipient's email address.

  4. At present, the sink configuration does not have a mapping configuration (similar to what you have already defined for the HTTP event source). Without the mapping, this configuration only allows you to publish events in the same format in which they are sent to WSO2 SP to be processed. This format is not readable and the email sinks do not support that format. Therefore, let's add a mapping configuration by following these sub steps.
    1. The published event must be in a format that can be read by humans. Therefore, you can add a basic mapping of the text type.

      @map(type = 'text')

    2. To transform the output event into a readable text message, the @payload annotation needs to be added in the following format.

      @map(type = 'text', @payload("Some text body"))

      Within the text payload, the stream variables can be specified via the {{ }} placeholder so that the runtime values are dynamically assigned. For example, if the stream has an attribute named attr1, a payload body that refers to this attribute looks as follows.

      @payload("The value is {{attr1}}")


      When the total production per hour falls below 5000, the following information in the output event varies

      • The name of the sweet category

      • The exact amount produced during the hour
        • The specific hour during which the total production was less than 5000

      Let's compose the body of the email to be sent  tothe factory manager via the @payload annotation with a placeholder for each detail mentioned above.

      @payload("Hello,

      The production of {{name}} has fallen to below 5000 units, to {{hourlyTotal}} in the past hour (hour {{currentHour}} of the day).

      This message was generated automatically.")

      Note that {{name}}, {{hourlyTotal}} and {{currentHour}} are placeholders for the attributes in the LowProductionStream stream, to which the sink is bound.


      Now, let's look at the completed output stream definition with the sink, and with the newlines represented by the \n newline character.

      @sink(type='email', address='factory012@sweets-r-us.com', username='factory012', password='secret_password', subject='Low production alert', to='bossman@sweets-r-us.com', @map(type = 'text', @payload("Hello,\n\nThe production of {{name}} has fallen to below 5000 units, to {{hourlyTotal}} in the past hour (hour {{currentHour}} of the day).\n\nThis message was generated automatically.")))

      define stream LowProductionStream (name string, hourlyTotal long, currentHour long)


The completed Siddhi application looks as follows:

@App:name('LowSweetProductionApp')

@source(type='http', receiver.url='http://localhost:5005/SweetProductionEP', @map(type = 'json', @attributes(name = '$.sweet', amount = '$.batch.count')))
define stream SweetProductionStream (name string, amount long);

@sink(type='email', address='factory012@sweets-r-us.com', username='factory012', password='secret_password', subject='Low production alert', to='bossman@sweets-r-us.com', @map(type = 'text', @payload("Hello,\n\nThe production of {{name}} has fallen to below 5000 units, to {{hourlyTotal}} in the past hour (hour {{currentHour}} of the day).\n\nThis message was generated automatically.")))
define stream LowProductionStream (name string, hourlyTotal long, currentHour int);

from SweetProductionStream#window.time(1 hour)
select name, sum(amount) as hourlyTotal, time:extract(currentTimeMillis(), 'HOUR') as currentHour
group by name
having hourlyTotal < 5000 and currentHour > 9 and currentHour < 17
insert into LowProductionStream;
com.atlassian.confluence.content.render.xhtml.migration.exceptions.UnknownMacroMigrationException: The macro 'next_previous_links2' is unknown.