from <stream-name> {<conditions>}#window.<window-name>(<parameters>)
select ( {<attribute-name>} | ‘*’ |)
insert [<output-type>] into <stream-name>
...
There are several types of windows.
1. lengthWindow Length windowswindow - a sliding window that keeps last N events.
2. Time window - a sliding window that keeps events arrived within the last T time period.
3. Time batch window - a time window that processes events in batches. A loop collects the incoming events arrived within last T time period, and outputs them as a batch.
4. Length batch window - a length window that outputs events as a batch only at the nth event arrival.
5. Time length window (not supported in the current version) - a sliding window that keeps the last N events that arrived within the last T time period.
6. Unique window - keeps only the latest events that are unique according to the given unique attribute.
7. First unique window - keeps the first events that are unique according to the given unique attribute.
8. External Time Window - a sliding window that processes according to timestamps defined externally (Defined as an attribute in the incoming stream)
...
1. sum
2. avg
3. max
4. min
5. count
6. median (not supported in current version)
7. stddev (not supported in current version)
8. avedev (not supported in current version)
Info | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
For most of the aggregate functions such as sum(), max() etc, the default output data type remains to be the same as the inputs. But for functions that involve division, such as avg(), the output type is double. For others that do not require floating points, such as count(), the output type is int.These Aggregate function gives different data types as an output based on the input data type. Please see the below table to get more information on this.
|
Aggregate function must be named using ‘as’ keyword. Thus name can be used for referring that attribute, and will be used as the attribute name in the output stream. Following examples shows some queries.
...
A sliding window that keeps last N events.
From the events having price >= 20 of the StockExchangeStream stream, output the expiring events of the length window to the StockQuote stream. Here the output events will have symbol and the per symbol average price as their attributes, only if the per symbol average price > 50.
from StockExchangeStream[price >= 20]#window.length(50)
select symbol, avg(price) as avgPrice
insert expired-events into StockQuote
group by symbol having avgPrice>50avgPrice>50
insert into StockQuote for expired-events
In the above query, avg(prize) is an aggregate function.
...
A sliding window that keeps events arrived within the last T time period.
From the events having symbol == 20 of the StockExchangeStream stream, output the both the newly arriving and expiring events of the time window to the IBMStockQuote stream. Here the output events will have maximum, average and minimum prices that has arrived within last minute as their attributes.
from StockExchangeStream[symbol == 'IBM']#window.time( 1 min )
select max(price) as maxPrice, avg(price) as avgPrice, min(price) as minPrice
insert all-events into IBMStockQuote insert into IBMStockQuote for all-events
Time batch window
Anchor | ||||
---|---|---|---|---|
|
A time window that processes events in batches. This in a loop collects the incoming events arrived within last T time period and outputs them as a batch.
From the events of the StockExchangeStream stream, output the events per every 2 minutes from the timeBatch window to the StockQuote stream. Here the output events will have symbol and the per symbol sum of volume for last 2 minutes as their attributes.
from StockExchangeStream#window.timeBatch( 2 min )
select symbol, sum(volume) as totalVolume
group by symbol
insert into StockQuote group by symbol
Length batch window
Anchor | ||||
---|---|---|---|---|
|
A length window that outputs events as a batch only at the nth event arrival.
From the events having price >= 20 of the StockExchangeStream stream, output the expiring events of the lengthBatch window to the StockQuote stream. Here the output events will have symbol and the per symbol average price> 50 as their attributes.
from StockExchangeStream[price >= 20]#window.lengthBatch(50)
select symbol, avg(price) as avgPrice
group by symbol having avgPrice>50
insert into StockQuote for expired-events into StockQuote
group by symbol
having avgPrice>50
Unique window
Anchor | ||||
---|---|---|---|---|
|
A window that keeps only the latest events that are unique according to the given attribute.
From the events of the StockExchangeStream stream, output the expiring events of the unique window to the StockQuote stream. The output events have symbol, price and volume as their attributes.
from StockExchangeStream#window.unique("symbol")
select symbol, price, volume
insert into StockQuote for expired-events into StockQuote
Info | ||
---|---|---|
| ||
Here, the output event is the immediate previous event having the same symbol of the current event. Unique window is mostly used in Join Queries, E.g If you want to get the current stock price of any symbol, you can join the SymbolStream (has an attribute symbol) with a Unique window as follows; from SymbolStream#window.lenght(1) unidirectional join StockExchangeStream#window.unique("symbol") |
First unique window
Anchor | ||||
---|---|---|---|---|
|
A window that keeps the first events that are unique according to the given unique attribute
From the events of the StockExchangeStream stream, output the events of the firstUnique window to the StockQuote stream. The output events have symbol, price and volume as their attributes.
from StockExchangeStream#window.firstUnique("symbol")
select symbol, price, volume
insert into StockQuote
Info | ||
---|---|---|
| ||
Here, the output event is the first event arriving for each symbol. FirstUnique window is mostly used in Join Queries, E.g If you want to know if a symbol has ever been traded in this StockExchange, you can join the SymbolStream (has an attribute symbol) with a First unique window as follows; from SymbolStream#window.lenght(1) unidirectional join StockExchangeStream#window.firstUnique("symbol") |
External Time Window
Anchor | ||||
---|---|---|---|---|
|
A window that would process time not according to the host system time, but according to timestamps provided externally by the input stream.
From the events of the LoginEvents stream, output the events of the login events of last 5 seconds based on the attribute 'timeStamp' of the stream. The output events have timeStamp, ip and count of login events during last 5 seconds as their attributes.
from LoginEvents#window.externalTime(timeStamp,5 sec)
select timeStamp, ip, count(timeStamp) as lastFiveSecLoginCount
insert into slidingFiveSecLoginInfo for all-events ;
Supported units for time windows
The following units are supported when specifying the time for a time window.
Unit | Syntax |
---|---|
Year | year | years |
Month | month | months |
Week | week | weeks |
Day | day | days |
Hour | hour | hours |
Minutes | minute | minutes | min |
Seconds | second | seconds | sec |
Milliseconds | millisecond | milliseconds |
Note that each unit supports both the singular and plural format (e.g. second, seconds) and some units have a shortened form (i.e. sec, min).
Following examples use different formats available for 'minutes'.
from StockExchangeStream[symbol == 'WSO2']#window.time( 1 minute )
select max(price) as maxPrice, avg(price) as avgPrice, min(price) as minPrice
insert into WSO2StockQuote for all-events
from StockExchangeStream[symbol == 'FBX']#window.timeBatch( 5 minutes )
select max(price) as maxPrice, avg(price) as avgPrice, volume
insert into FBStockQuote
from StockExchangeStream[symbol == 'FBX']#window.timeBatch( 5 min )
select max(price) as maxPrice, avg(price) as avgPrice, volume
insert into FBStockQuote
Excerpt | ||
---|---|---|
| ||
Siddhi windows in wiki format |
...