This guide provides instructions to use the Siddhi Query Language 3.0 with WSO2 CEP using examples.
Introduction to Siddhi Query Language
Siddhi Query Language (SiddhiQL) is designed to process event streams to identify complex event occurrences. The following table provides definitions of a few terms in the Siddhi Query Language.
Term | Definition |
---|---|
Event Stream | A logical series of events ordered in time. |
Event Stream Definition | This defines /wiki/spaces/TESB/pages/32604253. An event stream has a unique name and a set of attributes assigned specific types, with uniquely identifiable names defining its schema. |
Event | An event is associated with only one event stream, and all events of that stream have an identical set of attributes assigned specific types (or the same schema). An event contains a timestamp and the attribute values according to the schema. |
Attribute | An attribute has a unique name within the event stream. The attribute type can be string, int, long, float, double, bool or object. |
Query | A logical construct that derives new streams by combining existing streams. A query contains one or more input streams, handlers to modify those input streams, and an output stream to which it publishes its output events. |
Partition | A logical container that processes a subset of the queries based on a pre-defined rule of separation. |
Event Table | A structured representation of stored data, allowing stored data to be accessed and manipulated at runtime. |
Siddhi has the following language constructs:
- Event Stream Definitions
- Event Table Definitions
- Partitions
- Queries
The execution logic of Siddhi can be composed together as an execution plan, and all the above language constructs can be written as script in an execution plan. Each construct should be separated by a semicolon ( ;
).
Event stream
An event stream is a sequence of events with a defined schema. One or more event streams can be imported and manipulated using queries in order to identify complex event conditions, and new event streams are created to notify query responses.
A type sequence of events that will have a defined schema, one or more events stream can be consumed and manipulated by queries in order to identify complex event conditions and new event streams could be emitted to notify query responses.
Event stream definition
The event stream definition defines the event stream schema. An event stream definition contains a unique name and a set of attributes assigned specific types, with uniquely identifiable names within the stream.
define stream <stream name> (<attribute name> <attribute type>, <attribute name> <attribute type>, ... );
E.g. A stream named TempStream
can be created with the following attributes as shown below.
Attribute Name | Attribute Type |
---|---|
deviceID | long |
roomNo | int |
temp | double |
define stream TempStream (deviceID long, roomNo int, temp double);
Query
Each Siddhi query can consume one or more event streams and create a new event stream from them.
All queries contain an input section and an output section. Some also contain a projection section. A simple query with all three sections is as follows.
from <input stream name> select <attribute name>, <attribute name>, ... insert into <output stream name>
e.g., If you want to derive only the room temperature from the TempStream
event stream defined above, a query can be defined as follows.
from TempStream select roomNo, temp insert into RoomTempStream;
Inferred Stream: Here the RoomTempStream is an inferred Stream, i.e. RoomTempStream can be used as an input query for another query with out explicitly defining its Event Stream Definition. Because its Event Stream Definition is inferred from the above query.
Query projection
SiddhiQL supports the following for query projection.
Action | Description | |||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Selecting required objects for projection | This involves selecting only some of the attributes in an input stream to be inserted into an output stream. e.g.,The following query selects only the roomNo and temp attributes from the from TempStream select roomNo, temp insert into RoomTempStream; | |||||||||||||||||||||||||||||||||
Selecting all attributes for projection | This involves selecting all the attributes in an input stream to be inserted into an output stream. This can be done by using the asterisk sign ( * ) or by omitting the e.g., Use one of the following queries to select all the attributes in the from TempStream select * insert into NewTempStream; or from TempStream insert into NewTempStream; | |||||||||||||||||||||||||||||||||
Renaming attributes | This involves selecting attributes from the input streams and inserting them into the output stream with different names. e.g., The following query renames from TempStream select roomNo as roomNumber, temp as temperature insert into RoomTempStream; | |||||||||||||||||||||||||||||||||
Introducing the default value | This involves adding a default value and assigning it to an attribute using e.g., from TempStream select roomNo, temp, 'C' as scale insert into RoomTempStream; | |||||||||||||||||||||||||||||||||
Using mathematical and logical expressions | This involves using attributes with mathematical and logical expressions to the precedence order given below, and assigning them to the output attribute using Operator precedence
e.g., Converting Celsius to Fahrenheit and identifying server rooms from TempStream select roomNo, temp * 9/5 + 32 as temp, 'F' as scale, roomNo >= 100 and roomNo < 110 as isServerRoom insert into RoomTempStream; |
Functions
A function consumes zero, one or more function parameters and produces a result value.
Function parameters
Functions parameters can be attributes ( int
, long, float, double, string, bool, object
), results of other functions, results of mathematical or logical expressions or time parameters.
Time is a special parameter that can we defined using the time value as int and its unit type as <int> <unit>. Following are the supported unit types, Time upon execution will return its expression in the scale of milliseconds as a long value.
Unit | Syntax |
---|---|
Year | year | years |
Month | month | months |
Week | week | weeks |
Day | day | days |
Hour | hour | hours |
Minutes | minute | minutes | min |
Seconds | second | seconds | sec |
Milliseconds | millisecond | milliseconds |
E.g. Passing 1 hour and 25 minutes to test function.
test(1 hour 25 min)
Functions, mathematical expressions, and logical expressions can be used in a nested manner.
Inbuilt Functions
Siddhi supports the following inbuilt functions.
- coalesce
- convert
- instanceOfBoolean
- instanceOfDouble
- instanceOfFloat
- instanceOfInteger
- instanceOfLong
- instanceOfString
- UUID
- cast
- ifThenElse
e.g., The following configuration converts the room number to string and adds a message ID to each event using convert
and UUID
functions.
from TempStream select convert(roomNo, 'string') as roomNo, temp, UUID() as messageID insert into RoomTempStream;
Filter
Filters can be used with input streams to filter events based on the given filter condition. Filter conditions should be defined in square brackets next to the input stream name as shown below.
from <input stream name>[<filter condition>] select <attribute name>, <attribute name>, ... insert into <output stream name>
e.g., The following configuration filters all server rooms having a temperature greater than 40 degrees.
from TempStream [(roomNo >= 100 and roomNo < 110) and temp > 40 ] select roomNo, temp insert into HighTempStream;
Window
Windows allow you to capture a subset of events based on criterion from input event stream for calculation. They can be defined next to input streams using the '#window.'
prefix. Each input stream can only have maximum of one window as follows.
from <input stream name>[<filter condition>]#window.<window name>(<parameter>, <parameter>, ... ) select <attribute name>, <attribute name>, ... insert into <output stream name>
Windows emit two events for each event they consume: they are current-events and expired-events. A window emits current-event when a new event arrives at the window and emits expired-event whenever an event in a window expires based on that window criteria.
Inbuilt Windows
Siddhi supports the following inbuilt windows.
Output event categories
Window output can be manipulated based event categories, i.e. current and expired events, use the following keywords with output stream to manipulate the output.
- current events : Will emit all the events that arrives to the window. This is the default functionality is no event category is specified.
- expired events : Will emit all the events that expires from the window.
- all events : Will emit all the events that arrives and expires from the window.
For using with insert into statement use the above keywords between 'insert' and 'into' as given in the example below.
E.g. Delay all events in a stream by 1 minute.
from TempStream#window.time(1 min) select * insert expired events into DelayedTempStream
Aggregate functions
Aggregate functions can be used with windows to perform aggregate calculations within the defined window.
Inbuilt aggregate functions
Siddhi supports the following inbuilt aggregate functions.
E.g. Notify upon all event arrival and expiry the average temperature of all rooms based on all events arrived during last 10 minutes.
from TempStream#window.time(10 min) select avg(temp) as avgTemp, roomNo, deviceID insert all events into AvgTempStream;
Group by
Group by allows us to group the aggregation based on group by attributes.
E.g. Find the average temperature per room and device ID for the last 10 min.
from TempStream#window.time(10 min) select avg(temp) as avgTemp, roomNo, deviceID group by roomNo, deviceID insert into AvgTempStream;
Having
Having allows us to filter events after aggregation and after processing at the selector.
E.g. Find the average temperature per room for the last 10 min and alert if it's more than 30 degrees.
from TempStream#window.time(10 min) select avg(temp) as avgTemp, roomNo group by roomNo having avgTemp > 30 insert into AlertStream;
Output rate limiting
Output rate limiting allows queries to emit events periodically based on the condition specified.
Rate limiting follows the below syntax.
from <input stream name>... select <attribute name>, <attribute name>, ... output ({<output-type>} every (<time interval>|<event interval> events) | snapshot every <time interval>) insert into <output stream name>
With "<output-type>" the number of events that need to be emitted can be specified, "first", "last" and "all" are possible key wards that can be specified to emit only the first event, last event, or all events from the arrived events. If the key word is omitted it will default to "all" emitting all events.
With "<time interval>" the time interval for the periodic event emotion can be specified.
With "<event interval>" the number of event need to be arrived for the periodic event emotion can be specified.
Based on number of events
Here the events will be emitted every time when the predefined number of events have arrived, when emitting it can be specified to emit only the first event, last event, or all events from the arrived events.
e.g., Emit the last temperature event per sensor every 10 events
from TempStream select temp group by deviceID output last every 10 events insert into LowRateTempStream;
Based on time
Here the events will be emitted for every predefined time interval, when emitting it can be specified to emit only the first event, last event, or all events from the arrived events.
E.g. Emit the all temperature events every 10 seconds
from TempStream output every 10 sec insert into LowRateTempStream;
Periodic snapshot
This works best with windows, when the input stream as a window attached snapshot rate limiting will emit all current events arrived so far which does not have corresponding expired events for every predefined time interval, at the same time when no window is attached to the input stream it will only emit the last current event for every predefined time interval.
E.g. Emit snapshot of the events in time window of 5 seconds every one second.
from TempStream#window.time(5 sec) output snapshot every 1 sec insert into SnapshotTempStream;
Joins
Join allows two event streams to be merged based on a condition. Here each stream should be associated with a window (if there are no window assigned #window.length(0) with be assigned to the input event stream). During the joining process each incoming event on each stream will be matched against all events in the other input event stream window based on the given condition and for all matching event pairs an output event will be generated.
The syntax of join looks like below.
from <input stream name>[<filter condition>]#window.<window name>(<parameter>, ... ) {unidirectional} {as <reference>} join <input stream name>#window.<window name>(<parameter>, ... ) {unidirectional} {as <reference>} on <join condition> within <time gap> select <attribute name>, <attribute name>, ... insert into <output stream name>
With "on <join condition>” Siddhi joins only the events that matches the condition.
With "unidirectional" keyword the trigger of joining process can be controlled. By default events arriving on both streams trigger the joining process and when unidirectional keyword is used on an input stream only the events arriving on that stream will trigger the joining process. Note we cannot use unidirectional keyword for both the input streams (as thats equal to the default behaviour, which is not using the unidirectional keyword at all).
With "within <time gap>" the joining process matched the events that are within defined time gap of each other.
When projecting the join events the attributes of each stream need to be referred with the stream name (E.g. <stream name>.<attribute name>) or with its reference Id (specially when events of same streams are joined) (E.g. <stream reference Id>.<attribute name>), "select *" can be used or "select" statement itself can be omitted if all attributes of the joined events need to be projected, but these can only be used when both streams does not have any attributes with same names.
e.g., Assume that the temperature regulators are updated every minute. The following switches on the temperature regulators if they are not already on for all the rooms that have a room temperature greater than 30 degrees.
define stream TempStream(deviceID long, roomNo int, temp double); define stream RegulatorStream(deviceID long, roomNo int, isOn bool); from TempStream[temp > 30.0]#window.time(1 min) as T join RegulatorStream[isOn == false]#window.length(1) as R on T.roomNo == R.roomNo select T.roomNo, R.deviceID, 'start' as action insert into RegulatorActionStream;
Outer joins
The syntax of an outer join is as follows.
from <input stream name>[<filter condition>]#window.<window name>(<parameter>, ... ) {unidirectional} {as <reference>} (left|right|full) outer join <input stream name>#window.<window name>(<parameter>, ... ) {unidirectional} {as <reference>} on <join condition> within <time gap> select <attribute name>, <attribute name>, ... insert into <output stream name>
Left outer join
Outer join allows two event streams to be merged based on a condition. However, it returns all the events of left stream even if there are no matching events in the right stream. Here each stream should be associated with a window. During the joining process, each incoming event of each stream is matched against all the events in the other input event stream window based on the given condition. Incoming events of the right stream are matched against all events in the left event stream window based on the given condition. An output event is generated for all the matching event pairs. An output event is generated for incoming events of the left stream even if there are no matching events in right stream.
e.g., The following generates output events for all the events in the stockStream
stream whether there is a match for the symbol in the twitterStram
stream or not.
from stockStream#window.length(2) left outer join twitterStream#window.length(1) on stockStream.symbol== twitterStream.symbol select stockStream.symbol as symbol, twitterStream.tweet, stockStream.price insert all events into outputStream ;
Right outer join
This is similar to left outer join and, it returns all the events of right stream even if there are no matching events in the left stream. Incoming events of the left stream are matched against all events in the right event stream window based on the given condition. An output event is generated for all the matching event pairs. An output event is generated for incoming events of the right stream even if there are no matching events in left stream.
e.g., The following generates output events for all the events in the twitterStream
stream whether there is a match for the symbol in the stockStream
stream or not.
from stockStream#window.length(2) right outer join twitterStream#window.length(1) on stockStream.symbol== twitterStream.symbol select stockStream.symbol as symbol, twitterStream.tweet, stockStream.price insert all events into outputStream ;
Full outer join
The full outer join combines the results of left outer join and right outer join. An output event is generated for each incoming event even if there are no matching events in the other stream.
e.g., The following generates output events for all the incoming events of each stream whether there is a match for the symbol in the other stream or not.
from stockStream#window.length(2) full outer join twitterStream#window.length(1) on stockStream.symbol== twitterStream.symbol select stockStream.symbol as symbol, twitterStream.tweet, stockStream.price insert all events into outputStream ;
Pattern
Pattern allows event streams to be correlated over time and detect event patterns based on the order of event arrival. With pattern there can be other events in between the events that match the pattern condition. It will internally create state machines to track the states of the matching process. Pattern can correlate events over multiple input streams or over the same input stream, hence each matched input event need to be referenced such that it can be accessed for future processing and output generation.
The syntax of pattern looks like below.
from {every} <input event reference>=<input stream name>[<filter condition>] -> {every} <input event reference>=<input stream name>[<filter condition>] -> ... within <time gap> select <input event reference>.<attribute name>, <input event reference>.<attribute name>, ... insert into <output stream name>
Input Streams cannot be associated with a window.
With "->" we can correlate incoming events arrivals, having zero or many other events arrived in between the matching events.
With "<input event reference>=” the matched event can be stored for future reference.
With "within <time gap>" the pattern will be only matched with the events that are within defined time gap of each other.
Without "every" keyword the pattern can be match only once, use the "every" keyword appropriately to trigger a pattern matching process upon event arrival.
E.g. Alert if temperature of a room increases by 5 degrees within 10 min.
from every( e1=TempStream ) -> e2=TempStream[e1.roomNo==roomNo and (e1.temp + 5) <= temp ] within 10 min select e1.roomNo, e1.temp as initialTemp, e2.temp as finalTemp insert into AlertStream;
Logical pattern
Pattern not only matches event arriving on the temporal order but it can also correlate events having logical relationships.
Keywords like "and" and "or" can be used interred of "->" to illustrate the logical relationship.
With "and" occurrence of two events in any order can be matched
With "or" occurrence of an event from either of the input steams in any order can be matched
E.g. Alert when the room temperature reaches the temperature set on the regulator, (the pattern matching should be reseted whenever the temperature set on the regulator changes).
define stream TempStream(deviceID long, roomNo int, temp double); define stream RegulatorStream(deviceID long, roomNo int, tempSet double); from every( e1=RegulatorStream ) -> e2=TempStream[e1.roomNo==roomNo and e1.tempSet <= temp ] or e3=RegulatorStream[e1.roomNo==roomNo] select e1.roomNo, e2.temp as roomTemp having e3 is null insert into AlertStream;
Counting pattern
Counting pattern enable us to match multiple events based on the same matching condition. The expected number of events can be limited using the following postfix.
With <1:4> matches 1 to 4 events
With <2:> matches 2 or more events and with <:5> up to 5 events.
With <5> matches exactly 5 events.
To refer the specific occurrences of the event what are matched based on count limits, square brackets could be used with numerical and "last" keywords, such as e1[3] will refer to the third event, e1[last] will refer to the last event and e1[last - 1] will refer to the event before the last event of the matched event group.
E.g Get the temperature difference between two regulator events.
define stream TempStream(deviceID long, roomNo int, temp double); define stream RegulatorStream(deviceID long, roomNo int, tempSet double, isOn bool); from every( e1=RegulatorStream) -> e2=TempStream[e1.roomNo==roomNo]<1:> -> e3=RegulatorStream[e1.roomNo==roomNo] select e1.roomNo, e2[0].temp - e2[last].temp as tempDiff insert into TempDiffStream;
Sequence
Sequence allows event streams to be correlated over time and detect event sequences based on the order of event arrival. With sequence there can not be other events in between the events that match the sequence condition. It will internally create state machines to track the states of the matching process. Sequence can correlate events over multiple input streams or over the same input stream, hence each matched input event need to be referenced such that it can be accessed for future processing and output generation.
The syntax of sequence looks like below.
from {every} <input event reference>=<input stream name>[<filter condition>], <input event reference>=<input stream name>[<filter condition>]{+|*|?}, ... within <time gap> select <input event reference>.<attribute name>, <input event reference>.<attribute name>, ... insert into <output stream name>
Input Streams cannot be associated with a window.
With "," we can correlate immediate next incoming events arrivals, having no other events arrived in between the matching events.
With "<input event reference>=” the matched event can be stored for future reference.
With "within <time gap>" the sequence will be only matched with the events that are within defined time gap of each other.
Without "every" keyword the pattern can be match only once, use the "every" keyword in the beginning to trigger a sequence matching process upon every event arrival.
E.g. Alert if there is more than 1 degree increase in temperature between two consecutive temperature events.
from every e1=TempStream, e2=TempStream[e1.temp + 1 < temp ] select e1.temp as initialTemp, e2.temp as finalTemp insert into AlertStream;
Logical sequence
Sequence not only matches consecutive event arriving on the temporal order but it can also correlate events having logical relationships.
Keywords like "and" and "or" can be used interred of "," to illustrate the logical relationship.
With "and" occurrence of two events in any order can be matched
With "or" occurrence of an event from either of the input steams in any order can be matched
E.g. Notify when a regulator event is followed by both the temperature and humidity events.
define stream TempStream(deviceID long, temp double); define stream HumidStream(deviceID long, humid double); define stream RegulatorStream(deviceID long, isOn bool); from every e1=RegulatorStream, e2=TempStream and e3=HumidStream select e2.temp, e3.humid insert into StateNotificationStream;
Counting sequence
Counting sequence enable us to match multiple consecutive events based on the same matching condition. The expected number of events can be limited using the following postfix.
With "*" zero or more events can be matched.
With "+" one or more events can be matched.
With "?" zero or one events can be matched.
To refer the specific occurrences of the event what are matched based on count limits, square brackets could be used with numerical and "last" keywords, such as e1[3] will refer to the third event, e1[last] will refer to the last event and e1[last - 1] will refer to the event before the last event of the matched event group.
E.g Identify peak temperatures.
define stream TempStream(deviceID long, roomNo int, temp double); define stream RegulatorStream(deviceID long, roomNo int, tempSet double, isOn bool); from every e1=TempStream, e2=TempStream[e1.temp <= temp]+, e3=TempStream[e2[last].temp > temp] select e1.temp as initialTemp, e2[last].temp as peakTemp insert into TempDiffStream;
Partition
With partition Siddhi can divide both incoming events & queries and process them parallel in isolation. Each partition will be tagged with a partition key and only the events corresponding to the given partition key will be processed at that partition. Each partition will have separate instances of Siddhi queries providing isolation of processing states. Partition can contain more than one query.
Partition key can be defined using the categorical (string) attribute of the input event stream as Variable Partition or by defining separate ranges when the partition need to be defined using numeral attributes of the input event stream as Range Partition.
Variable partition
Partition using categorical (string) attributes will adhere to the following syntax.
partition with ( <attribute name> of <stream name>, <attribute name> of <stream name>, ... ) begin <query> <query> ... end;
E.g. Per sensor, calculate the maximum temperature over last 10 temperature events each sensor has emitted.
partition with ( deviceID of TempStream ) begin from TempStream#window.length(10) select roomNo, deviceID, max(temp) as maxTemp insert into DeviceTempStream end;
Range partition
Partition using numerical attributes will adhere to the following syntax.
partition with ( <condition> as <partition key> or <condition> as <partition key> or ... of <stream name>, ... ) begin <query> <query> ... end;
E.g. Per office area calculate the average temperature over last 10 minutes.
partition with ( roomNo>=1030 as 'serverRoom' or roomNo<1030 and roomNo>=330 as 'officeRoom' or roomNo<330 as 'lobby' of TempStream) ) begin from TempStream#window.time(10 min) select roomNo, deviceID, avg(temp) as avgTemp insert into AreaTempStream end;
Inner streams
Inner streams can be used for query instances of a partition to communicate between other query instances of the same partition. Inner Streams are denoted by a "#" in front of them, and these streams cannot be accessed outside of the partition block.
E.g. Per sensor, calculate the maximum temperature over last 10 temperature events when the sensor is having an average temperature greater than 20 over the last minute.
partition with ( deviceID of TempStream ) begin from TempStream#window.time(1 min) select roomNo, deviceID, temp, avg(temp) as avgTemp insert into #AvgTempStream from #AvgTempStream[avgTemp > 20]#window.length(10) select roomNo, deviceID, max(temp) as maxTemp insert into deviceTempStream end;
Event table
Event table allows Siddhi to work with stored events, and this can be viewed as a stored version of Event Stream or a table of events. By default events will be stored in-memory and Siddhi also provides an extension to work with data/events stored in RDBMS data stores.
Event table definition
Event Table Definition defines the event table schema. An Event Table Definition contains a unique name and a set of typed attributes with uniquely identifiable names within the table.
define table <table name> (<attribute name> <attribute type>, <attribute name> <attribute type>, ... );
With the above definition, events will be stored in-memory via a linked list data structure.
E.g. Room type table with name RoomTypeTable can be created as below with attributes room number as int and type as string.
define table RoomTypeTable (roomNo int, type string);
Indexing event table
Event table can be indexed for fast event access using the "IndexBy" annotation. With "IndexBy" only one attribute can be indexed, and when indexed it uses a map data structure to hold the events. Therefore if multiple events are inserted to the event table having the same index value only last inserted event will remain in the table.
E.g. An indexed room type table with attribute room number can be created as bellow with name RoomTypeTable and attributes room number as int & type as string.
@IndexBy('roomNo') define table RoomTypeTable (roomNo int, type string);
Hazelcast event table
Event tables also support persisting event data in a distributed manner using Hazelcast in-memory data grids. This functionality can easily be enabled using the "From" annotation. Also, the connection instructions for Hazelcast cluster can be assigned to the event table using "From" annotation.
Connection instructions that can be used with "From" annotation:
cluster.name : Hazelcast cluster/group name [Optional] (i.e cluster.name='cluster_a').
cluster.password : Hazelcast cluster/group password [Optional] (i.e cluster.password='pass@cluster_a').
cluster.addresses : Hazelcast cluster addresses (ip:port) as a comma separated string [Optional, client mode only] (i.e cluster.addresses='192.168.1.1:5700,192.168.1.2:5700').
well.known.addresses : Hazelcast WKAs (ip) as a comma separated string [Optional, server mode only] (i.e well.known.addresses='192.168.1.1,192.168.1.2').
collection.name : Hazelcast collection object name [Optional, can be used to share single table between multiple EPs] (i.e collection.name='stockTable').
@from(eventtable = 'hazelcast') define table RoomTypeTable(roomNo int, type string);
@from(eventtable = 'hazelcast', cluster.name = 'cluster_a', cluster.password = 'pass@cluster_a') define table RoomTypeTable(roomNo int, type string);
@from(eventtable = 'hazelcast', cluster.name = 'cluster_a', cluster.password = 'pass@cluster_a', cluster.addresses='192.168.1.1:5700,192.168.1.2.5700') define table RoomTypeTable(roomNo int, type string);
RDBMS event table
Event table can be backed with an RDBMS event store using the ''From" annotation. With "From" the data source or the connection instructions can be assigned to the event table. The RDBMS table name can be different from the event table name defined in Siddhi, and Siddhi will always refer to the defined event table name. However the defined event table name cannot be same as an already existing stream name, since syntactically both are considered the same with in the Siddhi query language.
RDBMS event table has been tested with the following databases:
MySQL
H2
Oracle
E.g. Create an event table with name RoomTypeTable having attributes room number as int & type as string, backed by RDBMS table named RoomTable from the data source named AnalyticsDataSource.
@From(eventtable='rdbms', datasource.name='AnalyticsDataSource', table.name='RoomTable') define table RoomTypeTable (roomNo int, type string);
Note
The datasource.name
given here is injected to the Siddhi engine by the CEP server. To configure data sources in the CEP, see Datasources in the Admin Guide.
E.g. Create an event table with name RoomTypeTable having attributes room number as int & type as string, backed by MySQL table named RoomTable from the database cepdb located at localhost:3306 having user name "root" and password "root".
@From(eventtable='rdbms', jdbc.url='jdbc:mysql://localhost:3306/cepdb', username='root', password='root', driver.name='com.mysql.jdbc.Driver', table.name='RoomTable') define table RoomTypeTable (roomNo int, type string);
Caching events with RDBMS event table
Several caches can be used with RDBMS backed event tables in order to reduce I/O operations and improve their performance. Currently all cache implementations provides size-based algorithms. Caches can be added using the "cache" element and the size of the cache can be defined using the "cache.size" element of the "From" annotation.
The supported cache implementations are as follows;
- Basic: Events are cached in a FIFO manner where the oldest event will be dropped when the cache is full.
- LRU (Least Recently Used): The least recently used event is dropped when the cache is full.
- LFU (Least Frequently Used): The least frequently used event is dropped when the cache is full.
In the "From" annotation, if the "cache" element is not specified the "Basic" cache will be assigned by default, and if the "cache.size" element is not assigned the default value 4096 will be assigned as the cache size.
E.g. Create an event table with name RoomTypeTable having attributes room number as int & type as string, backed by RDBMS table using least recently used caching algorithm for caching 3000 events.
@From(eventtable='rdbms', datasource.name='AnalyticsDataSource', table.name='RoomTable', cache='LRU', cache.size='3000') define table RoomTypeTable (roomNo int, type string);
Using Bloom filters
A Bloom Filter is an algorithm or an approach that can be used to perform quick searches. If you apply a Bloom Filter to a data set and carry out an isAvailable
check on that specific Bloom Filter instance, an accurate answer is returned if the search item is not available. This allows the quick improvement of updates, joins and isAvailable
checks.
The following example shows how to include Bloom Filters in an event table update query.
define stream StockStream (symbol string, price float, volume long); define stream CheckStockStream (symbol string, volume long); @from(eventtable = 'rdbms' ,datasource.name = 'cepDB' , table.name = 'stockInfo' , bloom.filters = 'enable') define table StockTable (symbol string, price float, volume long); @info(name = 'query1') from StockStream insert into StockTable ; @info(name = 'query2') from CheckStockStream[(StockTable.symbol==symbol) in StockTable] insert into OutStream;
For more information about In-memory event tables, see Sample 0106 - Using in-memory event tables.
For more information about RDBMS event tables, see Sample 0107 - Using RDBMS event tables.
Insert into
Query for inserting events into table is similar to the query of inserting events into event streams, where we will be using "insert into <table name>" code snippet. To insert only the specified output event category use "current events", "expired events" or "all events" keywords between 'insert' and 'into' keywords.
E.g. Insert all temperature events from TempStream to temperature table
from TempStream select * insert into TempTable;
Delete
Query for deleting events on event table can be written using a delete query having following syntax
from <input stream name> select <attribute name>, <attribute name>, ... delete <table name> on <condition>
Here the "on <condition>" can be used to select the events for deletion, and when writing this condition attribute names of the event tables should be always referred with table name and attributes of the select should not be have reference associated with them.
E.g. Delete the entries of the RoomTypeTable associated to the room numbers of DeleteStream.
define table RoomTypeTable (roomNo int, type string); define stream DeleteStream (roomNumber int); from DeleteStream delete RoomTypeTable on RoomTypeTable.roomNo == roomNumber;
To execute delete only for the specified output event category instead of "delete <table name> on <condition>" code snippet use "delete <table name> for <output event category> on <condition>", where "<output event category>" could be "current events", "expired events" or "all events" keywords.
Update
Query for updating events on event table can be written using an update query having following syntax
from <input stream name> select <attribute name> as <table attribute name>, <attribute name> as <table attribute name>, ... update <table name> on <condition>
Here the "on <condition>" can be used to select the events for update, and when writing this condition attribute names of the event tables should be always referred with table name and attributes of the select should not be have reference associated with them.
With "<table attribute name>" the attributes could be referred with the same name thats defined in event table, allowing Siddhi to identify which attributes need to be updated on event table.
E.g. For each room denoted by its number, update the room types of the RoomTypeTable based on the event sin UpdateStream.
define table RoomTypeTable (roomNo int, type string); define stream UpdateStream (roomNumber int, roomType string); from UpdateStream select roomType as type update RoomTypeTable on RoomTypeTable.roomNo == roomNumber;
To execute update only for the specified output event category instead of "update <table name> on <condition>" code snippet use "update <table name> for <output event category> on <condition>", where "<output event category>" could be "current events", "expired events" or "all events" keywords.
Insert Overwrite
Query for insert or overwrite events on event table can be written using an insert-overwrite query having following syntax
from <input stream name> select <attribute name> as <table attribute name>, <attribute name> as <table attribute name>, ... insert overwrite <table name> on <condition>
Here the "on <condition>" can be used to select the events for update or insert, and when writing this condition attribute names of the event tables should be always referred with table name and attributes of the select should not be have reference associated with them.
With "<table attribute name>" the attributes could be referred with the same name thats defined in event table, allowing Siddhi to identify which attributes need to be updated/inserted on event table.
E.g. For each room denoted by its number, update the room types of the RoomTypeTable based on the event sin UpdateStream or insert if it is not exist.
define table RoomTypeTable (roomNo int, type string); define stream UpdateStream (roomNumber int, roomType string); from UpdateStream select roomNumber as roomNo, roomType as type insert overwrite RoomTypeTable on RoomTypeTable.roomNo == roomNo;
In
Query for checking whether an attribute is in event table can be checked using conditions having the following syntax
<condition> in <table name>
Here the "<condition>" can be used to select the matching attribute, and when writing this condition attribute names of the event tables should be always referred with table name and attributes of the incoming stream should not be have reference associated with them.
E.g. By checking ServerRoomTable output only the temperature events associated with the saver rooms.
define table ServerRoomTable (roomNo int); define stream TempStream (deviceID long, roomNo int, temp double); from TempStream[ServerRoomTable.roomNo == roomNo in ServerRoomTable] insert into ServerTempStream;
Join
A stream can be joined with event table and retrieve data from the event table. In oder to join a stream with an event table a simple join query could be used, and at join the event table should not be associated with window operations as event table is not an active construct. Because of the same reason event table cannot be joined with another event table in Siddhi.
E.g. Update the events in temperature stream with their room type based on the RoomTypeTable.
define table RoomTypeTable (roomNo int, type string); define stream TempStream (deviceID long, roomNo int, temp double); from TempStream join RoomTypeTable on RoomTypeTable.roomNo == TempStream.roomNo select deviceID, RoomTypeTable.roomNo as roomNo, type, temp insert into EnhancedTempStream;
Event window
An event window is a window that can be shared across multiple queries. The events should be inserted from one or more streams. The event window publishes current and/or expired events as the output, and the time these events are published depends on the window type.
Event window definition
The syntax for an event window definition is as follows.
define window <event window name> (<attribute name> <attribute type>, <attribute name> <attribute type>, ... ) <window type>(<parameter>, <parameter>, …) <output event type>;
The above syntax contains the following:
Element | Description |
---|---|
<event window name> | A unique name for the window. |
<attribute name> <attribute type> | These elements define a set of attributes assigned specific types. |
| Any inbuilt window type available in Siddhi. For the complete list of available window types, see Inbuilt Windows. |
<output event type> | The possible values for this parameter are as follows:
If the output event type is not specified for an event window, it emits both current and expired events. |
Sample event window definitions
The window type is not specified in the following window definition. Therefore, it emits both current and expired events as the output.
define window SensorWindow (name string, value float, roomNo int, deviceID string) timeBatch(1 second);
The window type of the following window is
output all events
. Therefore, it emits both current and expired events as the output.define window SensorWindow (name string, value float, roomNo int, deviceID string) timeBatch(1 second) output all events;
Insert Into
The query for inserting events into a window is similar to the query for inserting events into event streams where the insert into <window name>
code snippet is used. To restrict the events inserted into the window by a specific category, use one of the following key words between the insert
and into
keywords
Keyword | Purpose |
---|---|
current events | To insert only current events into the event window. |
expired events | To insert only expired events into the event window. |
all events | To insert both current and expired events into the event window. |
e.g., The following query inserts both current and expired events from an event stream named sensorStream
to an event window named sensorWindow
.
from SensorStream insert into SensorWindow;
Output
An event window can be used as a stream in any query. However, an ordinary window cannot be applied to the output of an event window.
e.g., The following query selects the name and the maximum values for the value
and roomNo
attributes from an event window named SensorWindow
, and inserts them into an event stream named MaxSensorReadingStream
.
from SensorWindow select name, max(value) as maxValue, roomNo insert into MaxSensorReadingStream;
Join
An event window can be joined with another event stream, an event table or an event window.
e.g., The following query sends an alert to an event stream named RegulatorActionStream
if the temperature is greater than 30 and the regulator is off.
define stream TempStream(deviceID long, roomNo int, temp double); define stream RegulatorStream(deviceID long, roomNo int, isOn bool); define window TempWindow(deviceID long, roomNo int, temp double) time(1 min); from TempStream[temp > 30.0] insert into TempWindow; from TempWindow join RegulatorStream[isOn == false]#window.length(1) as R on TempWindow.roomNo == R.roomNo select TempWindow.roomNo, R.deviceID, 'start' as action insert into RegulatorActionStream;
Event trigger
Triggers allow us to create events periodically based on time and at Siddhi start. Event trigger will generate events on event stream with name same as the event trigger having only one attribute with name "triggered_time" and type long.
Event trigger definition
Event Trigger Definition defines the event triggering interval following the below syntax.
define trigger <trigger name> at {'start'| every <time interval>| '<cron expression>'};
With "'start'" an event will be trigger at Siddhi start.
With "every <time interval>" an event will be triggered periodically on the given time interval.
With "'<cron expression>'" an event will be triggered periodically based on the given cron expression, refer quartz-scheduler for config details.
E.g Trigger an event every 5 minutes.
define trigger FiveMinTriggerStream at every 5 min;
E.g Trigger an event at 10:15am every Monday, Tuesday, Wednesday, Thursday and Friday.
define trigger FiveMinTriggerStream at '0 15 10 ? * MON-FRI';
Siddhi logger
The Siddhi logger is used to log events that arrive in different logger priorities such as INFO
, DEBUG
, WARN
, FATAL
, ERROR
, OFF
, and TRACE
.
The following syntax is used.
<void> log(<string> priority, <string> logMessage, <bool> isEventLogged)
The parameters used in the query are as follows.
Parameter | Description |
---|---|
| The logging priority. Possible values are INFO , DEBUG , WARN , FATAL , ERROR , OFF , and TRACE . If no value is specified for this parameter, INFO is printed as the priority by default. |
logMessage | This parameter allows you to specify a message to be printed in the log. |
isEventLogged | This parameter specifies whether the event body should be included in the log. Possible values are true and false . If no value is specified, the event body is not printed in the log by default. |
The following examples illustrate the variations of the Siddhi logger.
e.g.,
The following query logs the event with the
INFO
logging priority. This is because the priority is not specified.from StockStream#log() select * insert into OutStream;
The following query logs the event with the
INFO
logging priority (because the priority is not specified) and thetest message
text.from StockStream#log('test message') select * insert into OutStream;
The following query logs the event with the
INFO
logging priority because a priority is not specified. The event itself is printed in the log.from StockStream#log(true) select * insert into OutStream;
The following query logs the event with the
INFO
logging priority (because the priority is not specified) and thetest message
text. The event itself is printed in the log.from StockStream#log('test message', true) select * insert into OutStream;
The following query logs the event with the WARN logging priority and the
test message
text.from StockStream#log('warn','test message') select * insert into OutStream;
The following query logs the event with the WARN logging priority and the
test message
text. The event itself is printed in the log.from StockStream#log('warn','test message',true) select * insert into OutStream;
Eval script
Eval script allows Siddhi to process events using other programming languages by defining functions by them. Eval script functions can be defined like event tables or streams and referred in the queries as Inbuilt Functions of Siddhi.
Eval script definition
Eval Script Definition defines the function operation following the below syntax.
define function <function name>[<language name>] return <return type> { <operation of the function> };
With "<function name>" a function will be defined to be using in the queries. Note this function will overwrite the Inbuilt Functions.
With "<language name>" the execution language will be defined e.g. JavaScript, R, Scala.
With "<return type>" the return type of the function is defined, it can either be int, long, float, double, string, bool or object. Here the function implementer should be responsible for returning the output on the defined <return type> for proper functionality.
With "<operation of the function>" the execution logic of the function should be written on the defined language defined in <language name> returning <return type>.
Supported Eval Script Languages
- JavaScript
- R
- Scala
JavaScript
E.g Concatenating function in JavaScript.
define function concatFn[JavaScript] return string { var str1 = data[0]; var str2 = data[1]; var str3 = data[2]; var responce = str1 + str2 + str3; return responce; }; define stream TempStream(deviceID long, roomNo int, temp double); from TempStream select concatFn(roomNo,'-',deviceID) as id, temp insert into DeviceTempStream;
R
E.g Concatenating function in R.
define function concatFn[R] return string { return(paste(data, collapse="")); }; define stream TempStream(deviceID long, roomNo int, temp double); from TempStream select concatFn(roomNo,'-',deviceID) as id, temp insert into DeviceTempStream;
Scala
E.g Concatenating function in Scala.
define function concatFn[Scala] return string { var concatenatedString = for(i <- 0 until data.length){ concatenatedString += data(i).toString } concatenatedString }; define stream TempStream(deviceID long, roomNo int, temp double); from TempStream select concatFn(roomNo,'-',deviceID) as id, temp insert into DeviceTempStream;
Siddhi extensions
Siddhi supports an extension architecture to support custom code and functions to be incorporated with Siddhi in a seamless manner. Extension will follow the following syntax;
<namespace>:<function name>(<parameter1>, <parameter2>, ... )
Here the namespace will allow Siddhi to identify the function as an extension and its extension group, the function name will denote the extension function within the given group, and the parameters will be the inputs that can be passed to the extension for evaluation and/or configuration.
E.g. A window extension created with namespace foo and function name unique can be referred as follows:
from StockExchangeStream[price >= 20]#window.foo:unique(symbol) select symbol, price insert into StockQuote
Extension types
Siddhi supports following five type of extensions:
Function Extension
For each event it consumes zero or more parameters and output a single attribute as an output. This could be used to manipulate event attributes to generate new attribute like Function operator. Implemented by extending "org.wso2.siddhi.core.executor.function.FunctionExecutor".
E.g. "math:sin(x)" here the sin function of math extension will return the sin value its parameter x.
Aggregate Function Extension
For each event it consumes zero or more parameters and output a single attribute having an aggregated results based in the input parameters as an output. This could be used with conjunction with a window in order to find the aggregated results based on the given window like Aggregate Function operator. Implemented by extending "org.wso2.siddhi.core.query.selector.attribute.aggregator.AttributeAggregator".
E.g. "custom:std(x)" here the std aggregate function of custom extension will return the standard deviation of value x based on the assigned window to its query.
Window Extension
Allows events to be collected and expired without altering the event format based on the given input parameters like the Window operator. Implemented by extending "org.wso2.siddhi.core.query.processor.stream.window.WindowProcessor".
E.g. "custom:unique(key)" here the unique window of custom extension will return all events as current events upon arrival as current events and when events arrive with the same value based on the "key" parameter the corresponding to a previous event arrived the previously arrived event will be emitted as expired event.
Stream Function Extension
Allows events to be altered by adding one or more attributes to it. Here events could be outputted upon each event arrival. Implemented by extending "org.wso2.siddhi.core.query.processor.stream.function.StreamFunctionProcessor".
E.g. "custom:pol2cart(theta,rho)" here the pol2cart function of custom extension will return all events by calculating the cartesian coordinates x & y and adding them as new attributes to the existing events.
Stream Processor Extension
Allows events to be collected and expired with altering the event format based on the given input parameters. Implemented by extending "oorg.wso2.siddhi.core.query.processor.stream.StreamProcessor".
E.g. "custom:perMinResults(arg1, arg2, ...)" here the perMinResults function of custom extension will return all events by adding one or more attributes the events based on the conversion logic and emitted as current events upon arrival as current events and when at expiration expired events could be emitted appropriate expiring events attribute values for matching the current events attributes counts and types.
Available Extensions
Siddhi currently have several prewritten extensions as follows;
Extensions released under Apache License v2 :
- math: Supporting mathematical operations
- str: Supporting String operations
- geo: Supporting geocode operations
- regex: Supporting regular expression operations
- time: Supporting time expression operations
- ml: Supporting Machine Learning expression operations
- timeseries: Supporting Time Series operations
- kf (Kalman Filter): Supporting filtering capabilities by detecting outliers of the data.
- map: Supporting to send a map object inside Siddhi stream definitions and use it inside queries.
- reorder: Supporting for reordering events from an unordered event stream using Kslack algorithm.
Extensions released under GNU/GPL License v3 :
- geo: Supporting geographical processing operations
- r: Supporting R executions
- nlp: Supporting Natural Language Processing expression operations
- pmml: Supporting Predictive Model Markup Language expression operations
You can get them from https://github.com/wso2-gpl/siddhi
Writing Custom Extensions
Custom extensions can be written in order to cater usecase specific logics that are not out of the box available in Siddhi or as an extension.
To create custom extensions two things need to be done.
Implementing the extension logic by extending well defined Siddhi interfaces. E.g implementing a UniqueWindowProcessor by extending org.wso2.siddhi.core.query.processor.stream.window.WindowProcessor.
package org.wso2.test; public class UniqueWindowProcessor extends WindowProcessor { ... }
Add an extension mapping file to map the written extension class with the extension function name and namespace. Here extension mapping file should be named as "<namespace>.siddhiext". E.g Mapping the written UniqueWindowProcessor extension with function name "unique" and namespace "foo", to do so the mapping file should be named as foo.siddhiext and the context of the file should as below;
# function name to class mapping of 'foo' extension unique=org.wso2.test.UniqueWindowProcessor
Refer following for implementing different types of Siddhi extensions with examples