Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

...

...

...

...

...

...

...

All streams that cannot be derived from queries must be defined before use.

Following example shows a sample stream definition;

Defining a stream called StockExchangeStream having three attributes, where symbol of type string, price of type int and volume of type float.
        define stream StockExchangeStream (symbol string, price int, volume float );

Queries

Filter

from <stream-name> {<conditions>}
insert into <stream-name> ( {<attribute-name>}| ‘*’|)

Filter query creates an output stream and inserts any events from the input stream that satisfies the conditions defined with the filters to the output stream.
Filters support following types of conditions

1. >, <, = , <=, <=, !=
2. contains, instanceof
3. and, or, not

Following example shows a sample query for a filter.

From all events of the StockExchangeStream stream, output only the events having price >= 20 and symbol == IBM to the StockQuote stream, where the output event will only have symbol and price as its attributes.

from StockExchangeStream[price >= 20 and symbol==’IBM’]
insert into StockQuote symbol, volume 

Here we are only projecting ‘symbol’ and ‘volume’ as the output of the query, and hence the output stream ‘StockQuote’ will only contain ‘symbol’ and ‘volume’ attribute, If the projection is omitted or ‘*’ is used, the query will output all the attributes of input streams.

Window

from <stream-name> {<conditions>}#window.<window-name>(<parameters>)
insert [<output-type>] into <stream-name> ( {<attribute-name>}| ‘*’|)

Window is a limited subset of events from an event stream. Users can define a window and then use the events on the window for his calculations.

A window has two type of outputs: current events and expired events. Current events will be emitted by a window when a new event arrives at the window. Expired events will be emitted whenever an already arrived event has expired from that window.

There are several types of windows.

1. Length windows - a sliding window that keeps last N events.
2. Time window - a sliding window that keeps events arrived within the last T time period.
3. Time batch window - a time window that processes events in batches. This in a loop collects the incoming events arrived within last T time period and outputs them as a batch.
4. Length batch window - a length window that outputs events as a batch only at the nth event arrival.
5. Time length window (not supported in 1.0) - a sliding window that keeps the last N events that arrived within the last T time period.
6. Unique window (not supported in 1.0) - keeps only the latest events that are unique according to the given unique attribute.
7. First unique window (not supported in 1.0) - keeps the first events that are unique according to the given unique attribute.

Siddhi queries can have three different output types: ‘current-events’, ‘expired-events’ and ‘all-events’. Users can define these output types by adding these keywords in between ‘insert’ and ‘into’ in the syntax. Here with ‘current-events’ keyword, the outputs is only triggered when new events arrive at the window and no notification will be given when the query was triggered by the expired events from the windows. Similarly when ‘expired-events’ keyword is defined, query emits output only when it was triggered by the expired events from the windows and not from new events. When ‘all-events’ keyword is defined, the outputs are emitted when the query was triggered by both newly arrived and expired events from the window. Here when no keyword is given, by default the query assigns ‘current-events’ to its output stream.

Following example shows a sample query

From the events having price >= 20 of the StockExchangeStream stream, output the expiring events of the lengthBatch window to the StockQuote stream. Here the output events will have symbol and the per symbol average price> 50 as their attributes.

from StockExchangeStream[price >= 20]#window.lengthBatch(50) 
insert expired-events into StockQuote symbol, avg(price) as avgPrice
group by symbol
having avgPrice>50

In output event stream, users may define aggregate functions via projections. For example, in the above query, avg(prize) shows such an aggregate function.

We support the following types of aggregate functions.

1. sum
2. avg
3. max
4. min
5. count
6. median (not supported in 1.0)
7. stddev (not supported in 1.0) 
8. avedev (not supported in 1.0) 

Aggregate function must be named using ‘as’ keyword, and this name will be used for referring that attribute and will be used as the attribute name in the output stream.

Join

from <stream>#<window> [unidirectional]
join <stream>#<window> [unidirectional]
[on <condition>] [within <time>]
insert [<output-type>] into <stream-name> ( {<attribute-name>}| ‘*’)

...

Outputs the matching events via JoinStream from last 2000 TickEvent and the NewsEvent that have arrived within 500 msec.

from TickEvent[symbol==’IBM’]#window.length(2000) join
NewsEvent#window.time(500)
insert into JoinStream *

Join can be in multiple forms

1. join - inner join
2. [((left|right|full) outer) | inner] join - only inner join is supported in version 1.0

When we join two streams, the events arriving at either stream will trigger a joining process. Siddhi also supports a special ‘unidirectional’ join. Here only one stream (the stream defined with the ‘unidirectional’ keyword ) will trigger the joining process.

Following shows a sample unidirectional join query

When an event arrives at the TickEvent that will be matched with all NewsEvents that have arrived within 500 msec, and if the TickEvent’s symbol == NewsEvent’s company, the output event will be generated and sent via JoinStream.

from TickEvent[symbol==’IBM’]#window.length(2000) as t unidirectional
join NewsEvent#window.time(500) as n
on t.symbol == n.company
insert into JoinStream *

Here ‘join’ only triggered when events arrives in TickEvent stream.
When no projection is given or when ‘*’ is used, the output stream attributes will contain both the input events attributes, and the output attributes will be named as <input-stream-name>_<attribute> to maintain uniqueness.

Pattern

from [from [every] <stream> -> [every] <stream> ... <stream> within <time>
insert into <stream-name> <attribute-name> {<attribute-name>}

...

Following example show a simple pattern query.

If an event arrival at Stream1 with price >= 20 is followed by an event arrival at Stream2 having price >= 1st event’s price, an output event will be triggered via StockQuote stream. The event will have two attributes; 1st event’s symbol, and 2nd event’s price.

from e1=Stream1[price >= 20] -> e2=Stream2[price >= e1.price]
insert into StockQuote e1.symbol as symbol, e2.price as price

Every and Within Keywords

Without “every” keyword, the query will only run once. If you have the “every” enclosing a pattern, then the query runs for every occurrence of that pattern.
Furthermore, If “within <time>” is used, Siddhi triggers only the patterns where the first and the last events constituting to the pattern have arrived within the given time period. In the following example, a1 and b1 should be within 3000 msec as specified.

For every infoStock event having action == "buy" following an confirmOrder event having command =="OK", the StockExchangeStream event will be matched when its price is > infoStock event’s price.

from every (a1 = infoStock[action == "buy"]
       -> a2 = confirmOrder[command == "OK"] )
       -> b1 = StockExchangeStream [price > infoStock.price]
within 3000
insert into StockQuote
      a1.action as action, b1.price as price

Logical Operations

You can combine streams in patterns using logical OR and AND.
1. and - occurrence of two events in any order
2. or - occurrence of an event from either of the steams in any order

Following example shows a sample query. It waits till the ‘buy’ action form both OrderStock1 and OrderStock2 before matching the prices in StockExchangeStream.

For every OrderStock1 event with action == "buy" and OrderStock2 event with action == "buy", the StockExchangeStream will be matched for events having price > 70 followed by events having price > 75. 

from every a1 = OrderStock1[action == "buy"] and
                a2 = OrderStock2[action == "buy"] ->
                b1 = StockExchangeStream[price > 70] ->
                b2 = StockExchangeStream[price > 75]
insert into StockQuote
                a1.action as action, b1.price as priceA, b2.price as priceB

Counting patterns

You can count the number of event occurrences of the same event stream with the minimum and maximum limits. For example, <1:4> means 1 to 4 events, <2:> means 2 or more, and [3] means exactly 3 events.

For every two or more infoStock events, the StockExchangeStream will be matched for three events having price > 70 followed by one to four events having price > 75.

from every a1 = infoStock[action == "buy"]<2:> ->
                b1 = StockExchangeStream[price > 70]<3> ->
                b2 = StockExchangeStream[price > 75]<1:4>
insert into StockQuote
               a1[0].action as action, b1.price as priceA, b2[2].price as priceB


When referring to the resuts events matching the count pattern, square brackets should be used to access a specific occurrence of that event. In the above example, a1[0] will refer the 1st event of the many events that have matched the pattern and arrived via the ‘infoStock’ stream.

 

Sequence processing

from <event-regular-expression-of-streams> within <time>
insert into <stream-name> <attribute-name> {<attribute-name>}

With patterns, there can be other events in between the events that match the pattern condition. In contrast, sequences must exactly match the sequence of events without any other events in between.

1. Sequence processing uses one or more streams.
2. As input, it takes a sequence of conditions defined in a simple regular expression fashion.
3. The events of the input streams should be assigned names in order to uniquely identify these events when constructing the query projection. 
4. It generates the output event stream such that any event in the output stream is a collection of events arrived from the input streams that exactly matches the order defined in the sequence.
5. For a sequence, the output attribute must be named using the ‘as’ keyword, and it will be used as the output attribute name.

When “within <time>” is used, just like with patterns, Siddhi will output only the events that are within that time of each other.

After one or more occurrence of infoStock event with action == "buy", the query matches StockExchangeStream events with maximum of one event with price between 70 and 75 and one event with price >= 75

from every a1 = infoStock[action == "buy"]+,
                b1 = StockExchangeStream[price > 70]?,
               b2 = StockExchangeStream[price >= 75]
insert into StockQuote
            a1[0].action as action, b1.price as priceA, b2.price as priceBJoin

Following Regular Expressions are supported

* Zero or more matches (reluctant).
+ One or more matches (reluctant).
? Zero or one match (reluctant).
or or

Similar to the pattern’s count operation, the ‘*’ and ‘+’ regex operators also output many events occurrences. Hence we have to refer these events using square brackets to access a specific occurrence of that event. In the above example, a1[0] refers to the first matching event arrived via the infoStock stream.

External Calls (Not supported in 1.0)

Instead of directly returning or inserting the results to the output stream, Siddhi allows users to send the event to an external function and get the results.

from <stream>
insert into <stream-name>
call <function-name>”(“<parameters>”)” ( {<attribute-name>}| ‘*’)

Example,
For the StockStream events having symbol=’IBM’, Siddhi will query StockTable of myStockDb database for all the events having symbol=’IBM’ and output them via IBMStream with attributes symbol, price and volume.

from StockStream[symbol=’IBM’]
insert into IBMStream
     call sql(myStockDb,
               “select symbol, price, vol from StockTable where
               symbol==${StockStream.symbol})”,
              “{symbol string, price int, volume float}”);