Publishing Processed Events
Introduction
In the previous tutorials, you covered how to capture events with WSO2 Stream Processor and how these events are preprocessed. In all the previous tutorials, you published the output as logs in the console.
In this tutorial, let's look at how events can be published via WSO2 Stream Processor to external endpoints. Similar to receiving events, WSO2 SP also supports many functionalities for publishing events, allowing you to publish events using different transports and in different formats.
In the previous tutorial, you wrote a query to identify the hours during which the per hour production was less than 5000. Let's consider a scenario where the factory foreman needs to send an email to the factory manager when there is such a shortage in the production of a sweet.
Tutorial steps
Let's get started!
Start the editor, and log in to the WSO2 Stream Processor Studio. Then open theÂ
PastHourSweetProductionApp
 Siddhi application that you created in Tutorial 5. It already has the following queries to identify the hours during which the total production of a sweet has been less than 5000.define stream SweetProductionStream (name string, amount long);
define stream LowProductionStream (name string, hourlyTotal long, currentHour long);
from SweetProductionStream#window.time(1 hour)
select name, sum(amount) as hourTotal, time:extract(currentTimeMillis(), 'HOUR') as currentHour group by name having hourlyTotal < 5000 and currentHour > 9 and currentHour < 17 insert into LowProductionStream;To send an email when the production of a sweet falls below 5000 during a hour, you need to configure an email sink. Let's add it to the Siddhi application as follows.
@sink(type='email');
define stream LowProductionStream (name string, hourlyTotal long, currentHour long)This sink configuration is added above theÂ
LowProductionStream
output stream definition to indicate that it is connected to this stream. The events sent via this sink are taken from theÂLowProductionStream
 output stream.For the email to reach the factory manager, the sweet factory foreman needs to send it from the factory's email address which isÂ
factory012@sweets-r-us.com
, to the factory manager's email address which isÂbossman@sweets-r-us.com
. Taking this into account, let's update the email sink as follows:@sink(type='email', address='factory012@sweets-r-us.com', username='factory012', password='secret_password', subject='Low production alert', to='bossman@sweets-r-us.com')
define stream LowProductionStream (name string, hourlyTotal long, currentHour long);This configuration includes the following information.
username
: The email username of the sender. (Note: this should be without the @ and the latter part of the address. eg: @gmail.com)address
: The email address of the sender.password
: The email password of the sender.subject
: The email password.to
: The recipient's email address.
- At present, the sink configuration does not have a mapping configuration (similar to what you have already defined for the HTTP event source). Without the mapping, this configuration only allows you to publish events in the same format in which they are sent to WSO2 SP to be processed. This format is not readable and the email sinks do not support that format. Therefore, let's add a mapping configuration by following these sub steps.
The published event must be in a format that can be read by humans. Therefore, you can add a basic mapping of the text type.
@map(type = 'text')
To transform the output event into a readable text message, theÂ
@payload
 annotation needs to be added in the following format.@map(type = 'text', @payload("Some text body"))
Within the text payload, the stream variables can be specified via the
{{ }}
placeholder so that the runtime values are dynamically assigned. For example, if the stream has an attribute named attr1, a payload body that refers to this attribute looks as follows.@payload("The value is {{attr1}}")
When the total production per hour falls below 5000, the following information in the output event varies
The name of the sweet category
- The exact amount produced during the hour
The specific hour during which the total production was less than 5000
Let's compose the body of the email to be sent  tothe factory manager via the
@payload
 annotation with a placeholder for each detail mentioned above.@payload("Hello,
The production of {{name}} has fallen to below 5000 units, to {{hourlyTotal}} in the past hour (hour {{currentHour}} of the day).
This message was generated automatically.")
Note that
{{name}}
,{{hourlyTotal}}
and{{currentHour}}
are placeholders for the attributes in theLowProductionStream
stream, to which the sink is bound.
Now, let's look at the completed output stream definition with the sink, and with the newlines represented by the \n newline character.@sink(type='email', address='factory012@sweets-r-us.com', username='factory012', password='secret_password', subject='Low production alert', to='bossman@sweets-r-us.com', @map(type = 'text', @payload("Hello,\n\nThe production of {{name}} has fallen to below 5000 units, to {{hourlyTotal}} in the past hour (hour {{currentHour}} of the day).\n\nThis message was generated automatically.")))
define stream LowProductionStream (name string, hourlyTotal long, currentHour long)
The completed Siddhi application looks as follows:
@App:name('LowSweetProductionApp') @source(type='http', receiver.url='http://localhost:5005/SweetProductionEP', @map(type = 'json', @attributes(name = '$.sweet', amount = '$.batch.count'))) define stream SweetProductionStream (name string, amount long); @sink(type='email', address='factory012@sweets-r-us.com', username='factory012', password='secret_password', subject='Low production alert', to='bossman@sweets-r-us.com', @map(type = 'text', @payload("Hello,\n\nThe production of {{name}} has fallen to below 5000 units, to {{hourlyTotal}} in the past hour (hour {{currentHour}} of the day).\n\nThis message was generated automatically."))) define stream LowProductionStream (name string, hourlyTotal long, currentHour int); from SweetProductionStream#window.time(1 hour) select name, sum(amount) as hourlyTotal, time:extract(currentTimeMillis(), 'HOUR') as currentHour group by name having hourlyTotal < 5000 and currentHour > 9 and currentHour < 17 insert into LowProductionStream;