This documentation is for WSO2 CEP 2.1.0. View the home page of the latest release.

Windows

from <stream-name> {<conditions>}#window.<window-name>(<parameters>)
insert [<output-type>] into <stream-name> ( {<attribute-name>}| ‘*’|)

Window is a limited subset of events from an event stream. Users can define a window and then use the events on the window for calculations. A window has two types of output: current events and expired events. A window emits current events when a new event arrives. Expired events are emitted whenever an existing event has expired from a window.

There are several types of windows.

1. lengthWindowLength windows - a sliding window that keeps last N events.
2. Time window - a sliding window that keeps events arrived within the last T time period.
3. Time batch window - a time window that processes events in batches. A loop collects the incoming events arrived within last T time period, and outputs them as a batch.
4. Length batch window - a length window that outputs events as a batch only at the nth event arrival.
5. Time length window (not supported in the current version) - a sliding window that keeps the last N events that arrived within the last T time period.
6. Unique window - keeps only the latest events that are unique according to the given unique attribute.
7. First unique window  - keeps the first events that are unique according to the given unique attribute.

Siddhi queries can have three different output types: ‘current-events’, ‘expired-events’ and ‘all-events’. Users can define these output types by adding the following keywords in between ‘insert’ and ‘into’ in the syntax.

  • ‘current-events’ keyword : The output is only triggered when new events arrive at the window. Notifications will not be given when the expired events trigger the query from the window.
  • ‘expired-events’ keyword : The query emits output only when the expired events trigger it from the window and not from new events.
  • ‘all-events’ keyword : The query emits output when it is triggered by both newly-arrived and expired events from the window.
  • No keyword is given : By default, the query assigns ‘current-events’ to its output stream.

In output event streams, users can define aggregate functions to calculate aggregations within the defined window. CEP supports the following types of aggregate functions.

1. sum
2. avg
3. max
4. min
5. count
6. median (not supported in current version)
7. stddev (not supported in current version) 
8. avedev (not supported in current version) 

Aggregate function must be named using ‘as’ keyword. Thus name can be used for referring that attribute, and will be used as the attribute name in the output stream. Following examples shows some queries.

Stream  defined by "define stream StockExchangeStream (symbol string, price int, volume float );" is used in below queries.

Length window

A sliding window that keeps last N events.

From the events having price >= 20 of the StockExchangeStream stream, output the expiring events of the length window to the StockQuote stream. Here the output events will have symbol and the per symbol average price as their attributes, only if the  per symbol average price > 50.

from StockExchangeStream[price >= 20]#window.length(50) 
insert expired-events into StockQuote symbol, avg(price) as avgPrice
group by symbol
having avgPrice>50

In the above query, avg(prize) is an aggregate function.

Time window

A sliding window that keeps events arrived within the last T time period.

From the events having symbol == 20 of the StockExchangeStream stream, output the both the newly arriving and expiring events of the time window to the IBMStockQuote stream. Here the output events will have maximum, average and minimum prices that has arrived within last minute as their attributes.

from StockExchangeStream[symbol == 'IBM']#window.time( 1 min ) 
insert all-events into IBMStockQuote max(price) as maxPrice, avg(price) as avgPrice, min(price) as minPrice

Time batch window

A time window that processes events in batches. This in a loop collects the incoming events arrived within last T time period and outputs them as a batch.

From the events of the StockExchangeStream stream, output the events per every 2 minutes from the timeBatch window to the StockQuote stream. Here the output events will have symbol and the per symbol sum of volume for last 2 minutes as their attributes.

from StockExchangeStream#window.timeBatch( 2 min
insert into StockQuote symbol, sum(volume) as totalVolume
group by symbol

Length batch window

A length window that outputs events as a batch only at the nth event arrival.

From the events having price >= 20 of the StockExchangeStream stream, output the expiring events of the lengthBatch window to the StockQuote stream. Here the output events will have symbol and the per symbol average price> 50 as their attributes.

from StockExchangeStream[price >= 20]#window.lengthBatch(50) 
insert expired-events into StockQuote symbol, avg(price) as avgPrice
group by symbol
having avgPrice>50

Unique window

A  window that keeps only the latest events that are unique according to the given attribute.

From the events of the StockExchangeStream stream, output the expiring events of the unique window to the StockQuote stream. The output events have symbol, price and volume as their attributes.

from StockExchangeStream#window.unique("symbol") 
insert expired-events into StockQuote symbol, price, volume

Info

Here, the output event is the immediate previous event having the same symbol of the current event.

Unique window is mostly used in Join Queries, E.g If you want to get the current stock price of any symbol, you can join the SymbolStream (has an attribute symbol) with a Unique window  as follows;

from SymbolStream#window.lenght(1) unidirectional join StockExchangeStream#window.unique("symbol") 
insert  into StockQuote StockExchangeStream.symbol as symbol,StockExchangeStream.price as lastTradedPrice


You can find a sample at http://ushanib.blogspot.com/2013/02/sample-demonstrate-unique-window-and.html

First unique window

A window that keeps the first events that are unique according to the given unique attribute

From the events of the StockExchangeStream stream, output the events of the firstUnique window to the StockQuote stream. The output events have symbol, price and volume as their attributes.

from StockExchangeStream#window.firstUnique("symbol") 
insert into StockQuote symbol, price, volume

info

Here, the output event is the first event arriving for each symbol.

FirstUnique window is mostly used in Join Queries, E.g If you want to know if a symbol has ever been traded in this StockExchange, you can join the SymbolStream (has an attribute symbol) with a First unique window as follows;

from SymbolStream#window.lenght(1) unidirectional join StockExchangeStream#window.firstUnique("symbol") 
insert  into AvailableSymbolStream StockExchangeStream.symbol as symbol


You can find a sample at http://ushanib.blogspot.com/2013/02/sample-demonstrate-unique-window-and.html

Supported units for time windows

The following units are supported when specifying the time for a time window.

UnitSyntax
Yearyear | years
Monthmonth | months
Weekweek | weeks
Dayday | days
Hourhour | hours
Minutesminute | minutes | min
Secondssecond | seconds | sec
Millisecondsmillisecond | milliseconds

Note that each unit supports both the singular and plural format (e.g. second, seconds) and some units have a shortened form (i.e. sec, min).

Following examples use different formats available for 'minutes'.

from StockExchangeStream[symbol == 'WSO2']#window.time( 1 minute ) 
select max(price) as maxPrice, avg(price) as avgPrice, min(price) as minPrice
insert into WSO2StockQuote for all-events  

from StockExchangeStream[symbol == 'FBX']#window.timeBatch( 5 minutes ) 
select max(price) as maxPrice, avg(price) as avgPrice, volume
insert into FBStockQuote

from StockExchangeStream[symbol == 'FBX']#window.timeBatch( 5 min ) 
select max(price) as maxPrice, avg(price) as avgPrice, volume
insert into FBStockQuote

 

Copyright © WSO2 Inc. 2005-2014