from <stream-name> {<conditions>}#window.<window-name>(<parameters>)
insert [<output-type>] into <stream-name> select ( {<attribute-name>} | ‘*’ |)
insert [<output-type>] into <stream-name>
Window is a limited subset of events from an event stream. Users can define a window and then use the events on the window for calculations. A window has two types of output: current events and expired events. A window emits current events when a new event arrives. Expired events are emitted whenever an existing event has expired from a window.
...
A sliding window that keeps last N events.
From the events having price >= 20 of the StockExchangeStream stream, output the expiring events of the length window to the StockQuote stream. Here the output events will have symbol and the per symbol average price as their attributes, only if the per symbol average price > 50.
from StockExchangeStream[price >= 20]#window.length(50)
insert expired-events into StockQuote select symbol, avg(price) as avgPrice
insert expired-events into StockQuote
group by symbol
having avgPrice>50
In the above query, avg(prize) is an aggregate function.
...
A sliding window that keeps events arrived within the last T time period.
From the events having symbol == 20 of the StockExchangeStream stream, output the both the newly arriving and expiring events of the time window to the IBMStockQuote stream. Here the output events will have maximum, average and minimum prices that has arrived within last minute as their attributes.
from StockExchangeStream[symbol == 'IBM']#window.time( 1 min )
insert all-events into IBMStockQuote select max(price) as maxPrice, avg(price) as avgPrice, min(price) as minPrice
insert all-events into IBMStockQuote
Time batch window
Anchor | ||||
---|---|---|---|---|
|
A time window that processes events in batches. This in a loop collects the incoming events arrived within last T time period and outputs them as a batch.
From the events of the StockExchangeStream stream, output the events per every 2 minutes from the timeBatch window to the StockQuote stream. Here the output events will have symbol and the per symbol sum of volume for last 2 minutes as their attributes.
from StockExchangeStream#window.timeBatch( 2 min )
insert into StockQuote select symbol, sum(volume) as totalVolume
insert into StockQuote
group by symbol
Length batch window
Anchor | ||||
---|---|---|---|---|
|
A length window that outputs events as a batch only at the nth event arrival.
From the events having price >= 20 of the StockExchangeStream stream, output the expiring events of the lengthBatch window to the StockQuote stream. Here the output events will have symbol and the per symbol average price> 50 as their attributes.
from StockExchangeStream[price >= 20]#window.lengthBatch(50)
insert expired-events into StockQuote select symbol, avg(price) as avgPrice
insert expired-events into StockQuote
group by symbol
having avgPrice>50
...
A window that keeps only the latest events that are unique according to the given attribute.
From the events of the StockExchangeStream stream, output the expiring events of the unique window to the StockQuote stream. The output events have symbol, price and volume as their attributes.
from StockExchangeStream#window.unique("symbol")
select symbol, price, volume
insert expired-events into StockQuote symbol, price, volumeStockQuote
Info | ||
---|---|---|
| ||
Here, the output event is the immediate previous event having the same symbol of the current event. Unique window is mostly used in Join Queries, E.g If you want to get the current stock price of any symbol, you can join the SymbolStream (has an attribute symbol) with a Unique window as follows; from SymbolStream#window.lenght(1) unidirectional join StockExchangeStream#window.unique("symbol")
|
...
A window that keeps the first events that are unique according to the given unique attribute
From the events of the StockExchangeStream stream, output the events of the firstUnique window to the StockQuote stream. The output events have symbol, price and volume as their attributes.
from StockExchangeStream#window.firstUnique("symbol")
insert into StockQuote select symbol, price, volume
insert into StockQuote
Info | ||
---|---|---|
| ||
Here, the output event is the first event arriving for each symbol. FirstUnique window is mostly used in Join Queries, E.g If you want to know if a symbol has ever been traded in this StockExchange, you can join the SymbolStream (has an attribute symbol) with a First unique window as follows; from SymbolStream#window.lenght(1) unidirectional join StockExchangeStream#window.firstUnique("symbol")
|
...