Sample 0104 - Calculations over time using Windows
Introduction
This sample demonstrates how to set up an execution plan to perform calculations over time by aggregating events. The queries use time windows and time batch windows to aggregate event over time. This sample uses the Event Simulator for inputs and the logger publisher for logging the custom output events to the DAS console. Custom events are events with custom mappings that do not adhere to the default event formats. For more information on event formats, see Event Formats.
The queries used in the WindowBasedAvgTemp
execution plan used in this sample are as follows:
-- with time sliding window of 1 mim from TempStream#window.time(1 min) select roomNo, avg(temp) as avgTemp group by roomNo insert all events into AvgRoomTempStream ; -- with time batch (tumbling) window of 1 min from TempStream#window.timeBatch(1 min) select roomNo, avg(temp) as avgTemp group by roomNo insert all events into AvgRoomTempPerMinStream ;
The first query does the following
- Processes the events received through the
TempStream
. - Defines a sliding time window of 1 minute that keeps each arriving event for 1 minute.
- Selects the attributes
roomNo
,avg(temp)
from the events stored in the time window. Due to the group by clause used here, the average is calculated perroomNo
. The average of the temp values is named asavgTemp
. - The
all events
clause in theinsert
statement makes the query to be triggered by both current events and expired events (current events are the incoming events to the window. An expired event is an event emitted by the window after being kept for 1 minute). - Emits these events as output events through the
AvgRoomTempStream
stream. - Mathematically, this query calculates the moving average of the room temperature for each room and gives instantaneous results upon the arrival/expiration of each incoming event.
The second query,
- Processes the events received through the
TempStream
. - Defines a time batch window of 1 minute to keep all incoming events and then emit events periodically every 1 minute.
- Selects the attributes
roomNo
,avg(temp)
from the events stored in the time window. Due to the group by clause used here, the average is calculated perroomNo
. The average of the temp values is named asavgTemp
. - The
all events
clause in theinsert
statement makes the query to be triggered by both current events and expired events (current events are the incoming events to the window. An expired event is an event emitted by the window after being kept in the window for 1 minute.) - Emits those events as output events through the
AvgRoomTempPerMinStream
stream. - Similar to the first query, this also calculates a moving average of the temperature for each room, but emits them every 1 minute.
Another execution plan continuously calculates the average temperature from the beginning. It includes the following queries.
from TempStream select roomNo, avg(temp) as avgTemp insert into AvgTempFromStartStream ;
The third query,
- Processes the events received through the
TempStream
. - When selecting the attributes, concatenate
roomNo
andavg(temp)
renamed asavgTemp,
which is the average of the temperature for each room from the start. - Emits those events as output events through the
AvgTempFromStartStream
.
Prerequisites
Set up the prerequisites required for all samples.
Building the sample
Start the WSO2 DAS server with the sample configuration numbered 0104. For instructions, see Starting sample CEP configurations.
This sample configuration points the default Axis2 repo to <DAS_HOME>/samples/cep/artifacts/0104
(by default, the Axis2 repo is <DAS_HOME>/repository/deployment/server
).
Executing the sample
Log into the DAS Management Console via the
https://<DAS_HOST>:<DAS_PORT/carbon
URL.- Go to Tools -> Event Simulator. Under the Multiple Events section, you can see the listed
events.csv
file that contains some sample data. Click Play to start sending sample events from the file. View the output events received from the DAS console. This sample uses the logger adaptor to log output events to the console. Since this execution plan uses 1 minute time windows, observe the results for a few minutes to get all results from different queries.
The output of events sent by the consumer from the producer are logged in the CLI as shown in the following example.