This section explains how to embed WSO2 Siddhi 3.0 in a Java project. Embedding Siddhi in a Java project allows you to use the Siddhi query language to carry out real time processing on complex events without running a WSO2 DAS server. This is useful when you need to carry out complex event processing in embedded devices in which WSO2 DAS cannot be deployed.
Follow the procedure below to use Siddhi 3.0 as a library.
Table of Contents | ||||
---|---|---|---|---|
|
Step 1: Creating a Java project
Create a Java project using Maven and include the following dependencies in its pom.xml
file.
Code Block | ||
---|---|---|
| ||
<dependency>
<groupId>org.wso2.siddhi</groupId>
<artifactId>siddhi-core</artifactId>
<version>3.0.2</version>
</dependency>
<dependency>
<groupId>org.wso2.siddhi</groupId>
<artifactId>siddhi-query-api</artifactId>
<version>3.0.2</version>
</dependency>
<dependency>
<groupId>org.wso2.siddhi</groupId>
<artifactId>siddhi-query-compiler</artifactId>
<version>3.0.2</version>
</dependency> |
Add the following repository configuration to the same file.
...
language | xml |
---|
...
This guide provides instructions to use the Siddhi Query Language 3.0 with WSO2 DAS using examples.
Table of Contents |
---|
Introduction to Siddhi Query Language
Siddhi Query Language (SiddhiQL) is designed to process event streams to identify complex event occurrences. The following table provides definitions of a few terms in the Siddhi Query Language.
Term | Definition |
---|---|
Event Stream | A logical series of events ordered in time. |
Event Stream Definition | This defines /wiki/spaces/TESB/pages/32604253. An event stream has a unique name and a set of attributes assigned specific types, with uniquely identifiable names defining its schema. |
Event | An event is associated with only one event stream, and all events of that stream have an identical set of attributes assigned specific types (or the same schema). An event contains a timestamp and the attribute values according to the schema. |
Attribute | An attribute has a unique name within the event stream. The attribute type can be string, int, long, float, double, bool or object. |
Query | A logical construct that derives new streams by combining existing streams. A query contains one or more input streams, handlers to modify those input streams, and an output stream to which it publishes its output events. |
Partition | A logical container that processes a subset of the queries based on a pre-defined rule of separation. |
Event Table | A structured representation of stored data, allowing stored data to be accessed and manipulated at runtime. |
Siddhi has the following language constructs:
- Event Stream Definitions
- Event Table Definitions
- Partitions
- Queries
The execution logic of Siddhi can be composed together as an execution plan, and all the above language constructs can be written as script in an execution plan. Each construct should be separated by a semicolon ( ;
).
Event Stream
An event stream is a sequence of events with a defined schema. One or more event streams can be imported and manipulated using queries in order to identify complex event conditions, and new event streams are created to notify query responses.
A type sequence of events that will have a defined schema, one or more events stream can be consumed and manipulated by queries in order to identify complex event conditions and new event streams could be emitted to notify query responses.
Event Stream Definition
The event stream definition defines the event stream schema. An event stream definition contains a unique name and a set of attributes assigned specific types, with uniquely identifiable names within the stream.
Code Block | ||||
---|---|---|---|---|
| ||||
define stream <stream name> (<attribute name> <attribute type>, <attribute name> <attribute type>, ... ); |
E.g. A stream named TempStream
can be created with the following attributes as shown below.
Anchor | ||||
---|---|---|---|---|
|
Attribute Name | Attribute Type |
---|---|
deviceID | long |
roomNo | int |
temp | double |
Code Block | ||||
---|---|---|---|---|
| ||||
define stream TempStream (deviceID long, roomNo int, temp double); |
Query
Each Siddhi query can consume one or more event streams and create a new event stream from them.
All queries contain an input section and an output section. Some also contain a projection section. A simple query with all three sections is as follows.
Code Block | ||||
---|---|---|---|---|
| ||||
from <input stream name>
select <attribute name>, <attribute name>, ...
insert into <output stream name> |
e.g., If you want to derive only the room temperature from the TempStream
event stream defined above, a query can be defined as follows.
Code Block | ||||
---|---|---|---|---|
| ||||
from TempStream
select roomNo, temp
insert into RoomTempStream; |
Inferred Stream: Here the RoomTempStream is an inferred Stream, i.e. RoomTempStream can be used as an input query for another query with out explicitly defining its Event Stream Definition. Because its Event Stream Definition is inferred from the above query.
Query Projection
SiddhiQL supports the following for query projection.
Action | Description | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Selecting required objects for projection | This involves selecting only some of the attributes in an input stream to be inserted into an output stream. e.g., The following query selects only the roomNo and temp attributes from the
| ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Selecting all attributes for projection | This involves selecting all the attributes in an input stream to be inserted into an output stream. This can be done by using the asterisk sign ( * ) or by omitting the e.g., Use one of the following queries to select all the attributes in the
or
| ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Renaming attributes | This involves selecting attributes from the input streams and inserting them into the output stream with different names. e.g., The following query renames
| ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Introducing the default value | This involves adding a default value and assigning it to an attribute using e.g.,
| ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Using mathematical and logical expressions | This involves using attributes with mathematical and logical expressions to the precedence order given below, and assigning them to the output attribute using Operator precedence
e.g., Converting Celsius to Fahrenheit and identifying server rooms
|
Functions
A function consumes zero, one or more function parameters and produces a result value.
Function parameters
Functions parameters can be attributes ( int
, long, float, double, string, bool, object
), results of other functions, results of mathematical or logical expressions or time parameters.
Time is a special parameter that can we defined using the time value as int and its unit type as <int> <unit>. Following are the supported unit types, Time upon execution will return its expression in the scale of milliseconds as a long value.
Unit | Syntax |
---|---|
Year | year | years |
Month | month | months |
Week | week | weeks |
Day | day | days |
Hour | hour | hours |
Minutes | minute | minutes | min |
Seconds | second | seconds | sec |
Milliseconds | millisecond | milliseconds |
E.g. Passing 1 hour and 25 minutes to test function.
Code Block | ||
---|---|---|
| ||
test(1 hour 25 min) |
Functions, mathematical expressions, and logical expressions can be used in a nested manner.
Inbuilt Functions
Siddhi supports the following inbuilt functions.
- coalesce
- convert
- instanceOfBoolean
- instanceOfDouble
- instanceOfFloat
- instanceOfInteger
- instanceOfLong
- instanceOfString
- UUID
- cast
- ifThenElse
e.g., The following configuration converts the room number to string and adds a message ID to each event using convert
and UUID
functions.
Code Block | ||||
---|---|---|---|---|
| ||||
from TempStream
select convert(roomNo, 'string') as roomNo, temp, UUID() as messageID
insert into RoomTempStream; |
Filter
Filters can be used with input streams to filter events based on the given filter condition. Filter conditions should be defined in square brackets next to the input stream name as shown below.
Code Block | ||||
---|---|---|---|---|
| ||||
from <input stream name>[<filter condition>]
select <attribute name>, <attribute name>, ...
insert into <output stream name> |
e.g., The following configuration filters all server rooms having a temperature greater than 40 degrees.
Code Block | ||||
---|---|---|---|---|
| ||||
from TempStream [(roomNo >= 100 and roomNo < 110) and temp > 40 ]
select roomNo, temp
insert into HighTempStream; |
Window
Windows allow you to capture a subset of events based on criterion from input event stream for calculation. They can be defined next to input streams using the '#window.'
prefix. Each input stream can only have maximum of one window as follows.
Code Block | ||||
---|---|---|---|---|
| ||||
from <input stream name>[<filter condition>]#window.<window name>(<parameter>, <parameter>, ... )
select <attribute name>, <attribute name>, ...
insert into <output stream name> |
Windows emit two events for each event they consume: they are current-events and expired-events. A window emits current-event when a new event arrives at the window and emits expired-event whenever an event in a window expires based on that window criteria.
Inbuilt Windows
Siddhi supports the following inbuilt windows.
Output Event Categories
Window output can be manipulated based event categories, i.e. current and expired events, use the following keywords with output stream to manipulate the output.
- current events : Will emit all the events that arrives to the window. This is the default functionality is no event category is specified.
- expired events : Will emit all the events that expires from the window.
- all events : Will emit all the events that arrives and expires from the window.
For using with insert into statement use the above keywords between 'insert' and 'into' as given in the example below.
E.g. Delay all events in a stream by 1 minute.
Code Block | ||||
---|---|---|---|---|
| ||||
from TempStream#window.time(1 min)
select *
insert expired events into DelayedTempStream |
Aggregate Functions
Aggregate functions can be used with windows to perform aggregate calculations within the defined window.
Inbuilt Aggregate Functions
Siddhi supports the following inbuilt aggregate functions.
E.g. Notify upon all event arrival and expiry the average temperature of all rooms based on all events arrived during last 10 minutes.
Code Block | ||||
---|---|---|---|---|
| ||||
from TempStream#window.time(10 min)
select avg(temp) as avgTemp, roomNo, deviceID
insert all events into AvgTempStream; |
Group By
Group by allows us to group the aggregation based on group by attributes.
E.g. Find the average temperature per room and device ID for the last 10 min.
Code Block | ||||
---|---|---|---|---|
| ||||
from TempStream#window.time(10 min)
select avg(temp) as avgTemp, roomNo, deviceID
group by roomNo, deviceID
insert into AvgTempStream; |
Having
Having allows us to filter events after aggregation and after processing at the selector.
E.g. Find the average temperature per room for the last 10 min and alert if it's more than 30 degrees.
Code Block | ||||
---|---|---|---|---|
| ||||
from TempStream#window.time(10 min)
select avg(temp) as avgTemp, roomNo
group by roomNo
having avgTemp > 30
insert into AlertStream; |
Output Rate Limiting
Output rate limiting allows queries to emit events periodically based on the condition specified.
Rate limiting follows the below syntax.
Code Block | ||||
---|---|---|---|---|
| ||||
from <input stream name>...
select <attribute name>, <attribute name>, ...
output ({<output-type>} every (<time interval>|<event interval> events) | snapshot every <time interval>)
insert into <output stream name> |
With "<output-type>" the number of events that need to be emitted can be specified, "first", "last" and "all" are possible key wards that can be specified to emit only the first event, last event, or all events from the arrived events. If the key word is omitted it will default to "all" emitting all events.
With "<time interval>" the time interval for the periodic event emotion can be specified.
With "<event interval>" the number of event need to be arrived for the periodic event emotion can be specified.
Based on number of events
Here the events will be emitted every time when the predefined number of events have arrived, when emitting it can be specified to emit only the first event, last event, or all events from the arrived events.
E.g. Emit last temperature event per sensor every 10 events
Code Block | ||||
---|---|---|---|---|
| ||||
from TempStream
group by deviceID
output last every 10 events
insert into LowRateTempStream; |
Based on time
Here the events will be emitted for every predefined time interval, when emitting it can be specified to emit only the first event, last event, or all events from the arrived events.
E.g. Emit the all temperature events every 10 seconds
Code Block | ||||
---|---|---|---|---|
| ||||
from TempStream
output every 10 sec
insert into LowRateTempStream; |
Periodic snapshot
This works best with windows, when the input stream as a window attached snapshot rate limiting will emit all current events arrived so far which does not have corresponding expired events for every predefined time interval, at the same time when no window is attached to the input stream it will only emit the last current event for every predefined time interval.
E.g. Emit snapshot of the events in time window of 5 seconds every one second.
Code Block | ||||
---|---|---|---|---|
| ||||
from TempStream#window.time(5 sec)
output snapshot every 1 sec
insert into SnapshotTempStream; |
Joins
Join allows two event streams to be merged based on a condition. Here each stream should be associated with a window (if there are no window assigned #window.length(0) with be assigned to the input event stream). During the joining process each incoming event on each stream will be matched against all events in the other input event stream window based on the given condition and for all matching event pairs an output event will be generated.
The syntax of join looks like below.
Code Block | ||||
---|---|---|---|---|
| ||||
from <input stream name>[<filter condition>]#window.<window name>(<parameter>, ... ) {unidirectional} {as <reference>}
join <input stream name>#window.<window name>(<parameter>, ... ) {unidirectional} {as <reference>}
on <join condition>
within <time gap>
select <attribute name>, <attribute name>, ...
insert into <output stream name> |
With "on <join condition>” Siddhi joins only the events that matches the condition.
With "unidirectional" keyword the trigger of joining process can be controlled. By default events arriving on both streams trigger the joining process and when unidirectional keyword is used on an input stream only the events arriving on that stream will trigger the joining process. Note we cannot use unidirectional keyword for both the input streams (as thats equal to the default behaviour, which is not using the unidirectional keyword at all).
With "within <time gap>" the joining process matched the events that are within defined time gap of each other.
When projecting the join events the attributes of each stream need to be referred with the stream name (E.g. <stream name>.<attribute name>) or with its reference Id (specially when events of same streams are joined) (E.g. <stream reference Id>.<attribute name>), "select *" can be used or "select" statement itself can be omitted if all attributes of the joined events need to be projected, but these can only be used when both streams does not have any attributes with same names.
E.g. Switch on temperature regulator if they are not already on, on all room which have current temperature greater than 30 degrees.
Code Block | ||||
---|---|---|---|---|
| ||||
define stream TempStream(deviceID long, roomNo int, temp double);
define stream RegulatorStream(deviceID long, roomNo int, isOn bool);
from TempStream[temp > 30.0]#window.time(1 min) as T
join RegulatorStream[isOn == false]#window.length(1) as R
on T.roomNo == R.roomNo
select T.roomNo, R.deviceID, 'start' as action
insert into RegulatorActionStream; |
Outer joins
The syntax of an outer join is as follows.
Code Block | ||
---|---|---|
| ||
from <input stream name>[<filter condition>]#window.<window name>(<parameter>, ... ) {unidirectional} {as <reference>}
(left|right|full) outer join <input stream name>#window.<window name>(<parameter>, ... ) {unidirectional} {as <reference>}
on <join condition>
within <time gap>
select <attribute name>, <attribute name>, ...
insert into <output stream name> |
Left outer join
Outer join allows two event streams to be merged based on a condition. However, it returns all the events of left stream even if there are no matching events in the right stream. Here each stream should be associated with a window. During the joining process, each incoming event of each stream is matched against all the events in the other input event stream window based on the given condition. Incoming events of the right stream are matched against all events in the left event stream window based on the given condition. An output event is generated for all the matching event pairs. An output event is generated for incoming events of the left stream even if there are no matching events in right stream.
e.g., The following generates output events for all the events in the stockStream
stream whether there is a match for the symbol in the twitterStram
stream or not.
Code Block | ||
---|---|---|
| ||
from stockStream#window.length(2)
left outer join twitterStream#window.length(1)
on stockStream.symbol== twitterStream.symbol
select stockStream.symbol as symbol, twitterStream.tweet, stockStream.price
insert all events into outputStream ; |
Right outer join
This is similar to left outer join and, it returns all the events of right stream even if there are no matching events in the left stream. Incoming events of the left stream are matched against all events in the right event stream window based on the given condition. An output event is generated for all the matching event pairs. An output event is generated for incoming events of the right stream even if there are no matching events in left stream.
e.g., The following generates output events for all the events in the twitterStream
stream whether there is a match for the symbol in the stockStream
stream or not.
Code Block | ||
---|---|---|
| ||
from stockStream#window.length(2)
right outer join twitterStream#window.length(1)
on stockStream.symbol== twitterStream.symbol
select stockStream.symbol as symbol, twitterStream.tweet, stockStream.price
insert all events into outputStream ; |
Full outer join
The full outer join combines the results of left outer join and right outer join. An output event is generated for each incoming event even if there are no matching events in the other stream.
e.g., The following generates output events for all the incoming events of each stream whether there is a match for the symbol in the other stream or not.
Code Block | ||
---|---|---|
| ||
from stockStream#window.length(2)
full outer join twitterStream#window.length(1)
on stockStream.symbol== twitterStream.symbol
select stockStream.symbol as symbol, twitterStream.tweet, stockStream.price
insert all events into outputStream ; |
Pattern
Pattern allows event streams to be correlated over time and detect event patterns based on the order of event arrival. With pattern there can be other events in between the events that match the pattern condition. It will internally create state machines to track the states of the matching process. Pattern can correlate events over multiple input streams or over the same input stream, hence each matched input event need to be referenced such that it can be accessed for future processing and output generation.
The syntax of pattern looks like below.
Code Block | ||||
---|---|---|---|---|
| ||||
from {every} <input event reference>=<input stream name>[<filter condition>] -> {every} <input event reference>=<input stream name>[<filter condition>] -> ...
within <time gap>
select <input event reference>.<attribute name>, <input event reference>.<attribute name>, ...
insert into <output stream name> |
Input Streams cannot be associated with a window.
With "->" we can correlate incoming events arrivals, having zero or many other events arrived in between the matching events.
With "<input event reference>=” the matched event can be stored for future reference.
With "within <time gap>" the pattern will be only matched with the events that are within defined time gap of each other.
Without "every" keyword the pattern can be match only once, use the "every" keyword appropriately to trigger a pattern matching process upon event arrival.
E.g. Alert if temperature of a room increases by 5 degrees within 10 min.
Code Block | ||||
---|---|---|---|---|
| ||||
from every( e1=TempStream ) -> e2=TempStream[e1.roomNo==roomNo and (e1.temp + 5) <= temp ]
within 10 min
select e1.roomNo, e1.temp as initialTemp, e2.temp as finalTemp
insert into AlertStream; |
Logical Pattern
Pattern not only matches event arriving on the temporal order but it can also correlate events having logical relationships.
Keywords like "and" and "or" can be used interred of "->" to illustrate the logical relationship.
With "and" occurrence of two events in any order can be matched
With "or" occurrence of an event from either of the input steams in any order can be matched
E.g. Alert when the room temperature reaches the temperature set on the regulator, (the pattern matching should be reseted whenever the temperature set on the regulator changes).
Code Block | ||||
---|---|---|---|---|
| ||||
define stream TempStream(deviceID long, roomNo int, temp double);
define stream RegulatorStream(deviceID long, roomNo int, tempSet double);
from every( e1=RegulatorStream ) -> e2=TempStream[e1.roomNo==roomNo and e1.tempSet <= temp ] or e3=RegulatorStream[e1.roomNo==roomNo]
select e1.roomNo, e2.temp as roomTemp
having e3 is null
insert into AlertStream; |
Counting Pattern
Counting pattern enable us to match multiple events based on the same matching condition. The expected number of events can be limited using the following postfix.
With <1:4> matches 1 to 4 events
With <2:> matches 2 or more events and with <:5> up to 5 events.
With <5> matches exactly 5 events.
To refer the specific occurrences of the event what are matched based on count limits, square brackets could be used with numerical and "last" keywords, such as e1[3] will refer to the third event, e1[last] will refer to the last event and e1[last - 1] will refer to the event before the last event of the matched event group.
E.g Get the temperature difference between two regulator events.
Code Block | ||||
---|---|---|---|---|
| ||||
define stream TempStream(deviceID long, roomNo int, temp double);
define stream RegulatorStream(deviceID long, roomNo int, tempSet double, isOn bool);
from every( e1=RegulatorStream) -> e2=TempStream[e1.roomNo==roomNo]<1:> -> e3=RegulatorStream[e1.roomNo==roomNo]
select e1.roomNo, e2[0].temp - e2[last].temp as tempDiff
insert into TempDiffStream; |
Sequence
Sequence allows event streams to be correlated over time and detect event sequences based on the order of event arrival. With sequence there can not be other events in between the events that match the sequence condition. It will internally create state machines to track the states of the matching process. Sequence can correlate events over multiple input streams or over the same input stream, hence each matched input event need to be referenced such that it can be accessed for future processing and output generation.
The syntax of sequence looks like below.
Code Block | ||||
---|---|---|---|---|
| ||||
from {every} <input event reference>=<input stream name>[<filter condition>], <input event reference>=<input stream name>[<filter condition>]{+|*|?}, ...
within <time gap>
select <input event reference>.<attribute name>, <input event reference>.<attribute name>, ...
insert into <output stream name> |
Input Streams cannot be associated with a window.
With "," we can correlate immediate next incoming events arrivals, having no other events arrived in between the matching events.
With "<input event reference>=” the matched event can be stored for future reference.
With "within <time gap>" the sequence will be only matched with the events that are within defined time gap of each other.
Without "every" keyword the pattern can be match only once, use the "every" keyword in the beginning to trigger a sequence matching process upon every event arrival.
E.g. Alert if there is more than 1 degree increase in temperature between two consecutive temperature events.
Code Block | ||||
---|---|---|---|---|
| ||||
from every e1=TempStream, e2=TempStream[e1.temp + 1 < temp ]
select e1.temp as initialTemp, e2.temp as finalTemp
insert into AlertStream; |
Logical Sequence
Sequence not only matches consecutive event arriving on the temporal order but it can also correlate events having logical relationships.
Keywords like "and" and "or" can be used interred of "," to illustrate the logical relationship.
With "and" occurrence of two events in any order can be matched
With "or" occurrence of an event from either of the input steams in any order can be matched
E.g. Notify when a regulator event is followed by both the temperature and humidity events.
Code Block | ||||
---|---|---|---|---|
| ||||
define stream TempStream(deviceID long, temp double);
define stream HumidStream(deviceID long, humid double);
define stream RegulatorStream(deviceID long, isOn bool);
from every e1=RegulatorStream, e2=TempStream and e3=HumidStream
select e2.temp, e3.humid
insert into StateNotificationStream; |
Counting Sequence
Counting sequence enable us to match multiple consecutive events based on the same matching condition. The expected number of events can be limited using the following postfix.
With "*" zero or more events can be matched.
With "+" one or more events can be matched.
With "?" zero or one events can be matched.
To refer the specific occurrences of the event what are matched based on count limits, square brackets could be used with numerical and "last" keywords, such as e1[3] will refer to the third event, e1[last] will refer to the last event and e1[last - 1] will refer to the event before the last event of the matched event group.
E.g Identify peak temperatures.
Code Block | ||||
---|---|---|---|---|
| ||||
define stream TempStream(deviceID long, roomNo int, temp double);
define stream RegulatorStream(deviceID long, roomNo int, tempSet double, isOn bool);
from every e1=TempStream, e2=TempStream[e1.temp <= temp]+, e3=TempStream[e2[last].temp > temp]
select e1.temp as initialTemp, e2[last].temp as peakTemp
insert into TempDiffStream; |
Partition
With partition Siddhi can divide both incoming events & queries and process them parallel in isolation. Each partition will be tagged with a partition key and only the events corresponding to the given partition key will be processed at that partition. Each partition will have separate instances of Siddhi queries providing isolation of processing states. Partition can contain more than one query.
Partition key can be defined using the categorical (string) attribute of the input event stream as Variable Partition or by defining separate ranges when the partition need to be defined using numeral attributes of the input event stream as Range Partition.
Variable Partition
Partition using categorical (string) attributes will adhere to the following syntax.
Code Block | ||||
---|---|---|---|---|
| ||||
partition with ( <attribute name> of <stream name>, <attribute name> of <stream name>, ... )
begin
<query>
<query>
...
end; |
E.g. Per sensor, calculate the maximum temperature over last 10 temperature events each sensor has emitted.
Code Block | ||||
---|---|---|---|---|
| ||||
partition with ( deviceID of TempStream )
begin
from TempStream#window.length(10)
select roomNo, deviceID, max(temp) as maxTemp
insert into DeviceTempStream
end; |
Range Partition
Partition using numerical attributes will adhere to the following syntax.
Code Block | ||||
---|---|---|---|---|
| ||||
partition with ( <condition> as <partition key> or <condition> as <partition key> or ... of <stream name>, ... )
begin
<query>
<query>
...
end; |
E.g. Per office area calculate the average temperature over last 10 minutes.
Code Block | ||||
---|---|---|---|---|
| ||||
partition with ( roomNo>=1030 as 'serverRoom' or roomNo<1030 and roomNo>=330 as 'officeRoom' or roomNo<330 as 'lobby' of TempStream) )
begin
from TempStream#window.time(10 min)
select roomNo, deviceID, avg(temp) as avgTemp
insert into AreaTempStream
end; |
Inner Streams
Inner streams can be used for query instances of a partition to communicate between other query instances of the same partition. Inner Streams are denoted by a "#" in front of them, and these streams cannot be accessed outside of the partition block.
E.g. Per sensor, calculate the maximum temperature over last 10 temperature events when the sensor is having an average temperature greater than 20 over the last minute.
Code Block | ||||
---|---|---|---|---|
| ||||
partition with ( deviceID of TempStream )
begin
from TempStream#window.time(1 min)
select roomNo, deviceID, temp, avg(temp) as avgTemp
insert into #AvgTempStream
from #AvgTempStream[avgTemp > 20]#window.length(10)
select roomNo, deviceID, max(temp) as maxTemp
insert into deviceTempStream
end; |
Event Table
Event table allows Siddhi to work with stored events, and this can be viewed as a stored version of Event Stream or a table of events. By default events will be stored in-memory and Siddhi also provides an extension to work with data/events stored in RDMS data stores.
Event Table Definition
Event Table Definition defines the event table schema. An Event Table Definition contains a unique name and a set of typed attributes with uniquely identifiable names within the table.
Code Block | ||||
---|---|---|---|---|
| ||||
define table <table name> (<attribute name> <attribute type>, <attribute name> <attribute type>, ... ); |
With the above definition, events will be stored in-memory via a linked list data structure.
E.g. Room type table with name RoomTypeTable can be created as below with attributes room number as int and type as string.
Code Block | ||||
---|---|---|---|---|
| ||||
define table RoomTypeTable (roomNo int, type string); |
Indexing Event Table
Event table can be indexed for fast event access using the "IndexBy" annotation. With "IndexBy" only one attribute can be indexed, and when indexed it uses a map data structure to hold the events. Therefore if multiple events are inserted to the event table having the same index value only last inserted event will remain in the table.
E.g. An indexed room type table with attribute room number can be created as bellow with name RoomTypeTable and attributes room number as int & type as string.
Code Block | ||||
---|---|---|---|---|
| ||||
@IndexBy('roomNo')
define table RoomTypeTable (roomNo int, type string); |
Hazelcast Event Table
Event tables also support persisting event data in a distributed manner using Hazelcast in-memory data grids. This functionality can easily be enabled using the "From" annotation. Also, the connection instructions for Hazelcast cluster can be assigned to the event table using "From" annotation.
Info |
---|
Connection instructions that can be used with "From" annotation: |
Code Block | ||||
---|---|---|---|---|
| ||||
@from(eventtable = 'hazelcast')
define table RoomTypeTable(roomNo int, type string); |
Code Block | ||||
---|---|---|---|---|
| ||||
@from(eventtable = 'hazelcast', cluster.name = 'cluster_a', cluster.password = 'pass@cluster_a')
define table RoomTypeTable(roomNo int, type string); |
Code Block | ||||
---|---|---|---|---|
| ||||
@from(eventtable = 'hazelcast', cluster.name = 'cluster_a', cluster.password = 'pass@cluster_a', cluster.addresses='192.168.1.1:5700,192.168.1.2.5700')
define table RoomTypeTable(roomNo int, type string); |
RDBMS Event Table
Event table can be backed with an RDBMS event store using the ''From" annotation. With "From" the data source or the connection instructions can be assigned to the event table. The RDBMS table name can be different from the event table name defined in Siddhi, and Siddhi will always refer to the defined event table name. However the defined event table name cannot be same as an already existing stream name, since syntactically both are considered the same with in the Siddhi query language.
RDBMS event table has been tested with the following databases:
MySQL
H2
Oracle
E.g. Create an event table with name RoomTypeTable having attributes room number as int & type as string, backed by RDBMS table named RoomTable from the data source named AnalyticsDataSource.
Code Block | ||||
---|---|---|---|---|
| ||||
@From(eventtable='rdbms', datasource.name='AnalyticsDataSource', table.name='RoomTable')
define table RoomTypeTable (roomNo int, type string); |
Info | ||
---|---|---|
| ||
The |
E.g. Create an event table with name RoomTypeTable having attributes room number as int & type as string, backed by MySQL table named RoomTable from the database cepdb located at localhost:3306 having user name "root" and password "root".
Code Block | ||||
---|---|---|---|---|
| ||||
@From(eventtable='rdbms', jdbc.url='jdbc:mysql://localhost:3306/cepdb', username='root', password='root', driver.name='com.mysql.jdbc.Driver', table.name='RoomTable')
define table RoomTypeTable (roomNo int, type string); |
Caching events with RDBMS Event Table
Several caches can be used with RDMBS backed event tables in order to reduce I/O operations and improve their performance. Currently all cache implementations provides size-based algorithms. Caches can be added using the "cache" element and the size of the cache can be defined using the "cache.size" element of the "From" annotation.
The supported cache implementations are as follows;
- Basic: Events are cached in a FIFO manner where the oldest event will be dropped when the cache is full.
- LRU (Least Recently Used): The least recently used event is dropped when the cache is full.
- LFU (Least Frequently Used): The least frequently used event is dropped when the cache is full.
In the "From" annotation, if the "cache" element is not specified the "Basic" cache will be assigned by default, and if the "cache.size" element is not assigned the default value 4096 will be assigned as the cache size.
E.g. Create an event table with name RoomTypeTable having attributes room number as int & type as string, backed by RDBMS table using least recently used caching algorithm for caching 3000 events.
Code Block | ||||
---|---|---|---|---|
| ||||
@From(eventtable='rdbms', datasource.name='AnalyticsDataSource', table.name='RoomTable', cache='LRU', cache.size='3000')
define table RoomTypeTable (roomNo int, type string); |
Using Bloom Filters
A Bloom Filter is an algorithm or an approach that can be used to perform quick searches. If you apply a Bloom Filter to a data set and carry out an isAvailable
check on that specific Bloom Filter instance, an accurate answer is returned if the search item is not available. This allows the quick improvement of updates, joins and isAvailable
checks.
The following example shows how to include Bloom Filters in an event table update query.
Code Block | ||
---|---|---|
| ||
define stream StockStream (symbol string, price float, volume long);
define stream CheckStockStream (symbol string, volume long);
@from(eventtable = 'rdbms' ,datasource.name = 'cepDB' , table.name = 'stockInfo' , bloom.filters = 'enable')
define table StockTable (symbol string, price float, volume long);
@info(name = 'query1')
from StockStream
insert into StockTable ;
@info(name = 'query2')
from CheckStockStream[(StockTable.symbol==symbol) in StockTable]
insert into OutStream; |
For more information about In-memory event tables, see Sample 0106 - Using in-memory event tables .
For more information about RDBMS event tables, see Sample 0107 - Using RDBMS event tables .
Insert into
Query for inserting events into table is similar to the query of inserting events into event streams, where we will be using "insert into <table name>" code snippet. To insert only the specified output event category use "current events", "expired events" or "all events" keywords between 'insert' and 'into' keywords.
E.g. Insert all temperature events from TempStream to temperature table
Code Block | ||||
---|---|---|---|---|
| ||||
from TempStream
select *
insert into TempTable; |
Delete
Query for deleting events on event table can be written using a delete query having following syntax
Code Block | ||||
---|---|---|---|---|
| ||||
from <input stream name>
select <attribute name>, <attribute name>, ...
delete <table name>
on <condition> |
Here the "on <condition>" can be used to select the events for deletion, and when writing this condition attribute names of the event tables should be always referred with table name and attributes of the select should not be have reference associated with them.
E.g. Delete the entries of the RoomTypeTable associated to the room numbers of DeleteStream.
Code Block | ||||
---|---|---|---|---|
| ||||
define table RoomTypeTable (roomNo int, type string);
define stream DeleteStream (roomNumber int);
from DeleteStream
delete RoomTypeTable
on RoomTypeTable.roomNo == roomNumber; |
To execute delete only for the specified output event category instead of "delete <table name> on <condition>" code snippet use "delete <table name> for <output event category> on <condition>", where "<output event category>" could be "current events", "expired events" or "all events" keywords.
Update
Query for updating events on event table can be written using an update query having following syntax
Code Block | ||||
---|---|---|---|---|
| ||||
from <input stream name>
select <attribute name> as <table attribute name>, <attribute name> as <table attribute name>, ...
update <table name>
on <condition> |
Here the "on <condition>" can be used to select the events for update, and when writing this condition attribute names of the event tables should be always referred with table name and attributes of the select should not be have reference associated with them.
With "<table attribute name>" the attributes could be referred with the same name thats defined in event table, allowing Siddhi to identify which attributes need to be updated on event table.
E.g. For each room denoted by its number, update the room types of the RoomTypeTable based on the event sin UpdateStream.
Code Block | ||||
---|---|---|---|---|
| ||||
define table RoomTypeTable (roomNo int, type string);
define stream UpdateStream (roomNumber int, roomType string);
from UpdateStream
select roomType as type
update RoomTypeTable
on RoomTypeTable.roomNo == roomNumber; |
To execute update only for the specified output event category instead of "update <table name> on <condition>" code snippet use "update <table name> for <output event category> on <condition>", where "<output event category>" could be "current events", "expired events" or "all events" keywords.
In
Query for checking whether an attribute is in event table can be checked using conditions having the following syntax
Code Block | ||||
---|---|---|---|---|
| ||||
<condition> in <table name> |
Here the "<condition>" can be used to select the matching attribute, and when writing this condition attribute names of the event tables should be always referred with table name and attributes of the incoming stream should not be have reference associated with them.
E.g. By checking ServerRoomTable output only the temperature events associated with the saver rooms.
Code Block | ||||
---|---|---|---|---|
| ||||
define table ServerRoomTable (roomNo int);
define stream TempStream (deviceID long, roomNo int, temp double);
from TempStream[ServerRoomTable.roomNo == roomNo in ServerRoomTable]
insert into ServerTempStream; |
Join
A stream can be joined with event table and retrieve data from the event table. In oder to join a stream with an event table a simple join query could be used, and at join the event table should not be associated with window operations as event table is not an active construct. Because of the same reason event table cannot be joined with another event table in Siddhi.
E.g. Update the events in temperature stream with their room type based on the RoomTypeTable.
Code Block | ||||
---|---|---|---|---|
| ||||
define table RoomTypeTable (roomNo int, type string);
define stream TempStream (deviceID long, roomNo int, temp double);
from TempStream join RoomTypeTable
on RoomTypeTable.roomNo == TempStream.roomNo
select deviceID, RoomTypeTable.roomNo as roomNo, type, temp
insert into EnhancedTempStream; |
Event Trigger
Triggers allow us to create events periodically based on time and at Siddhi start. Event trigger will generate events on event stream with name same as the event trigger having only one attribute with name "triggered_time" and type long.
Event Trigger Definition
Event Trigger Definition defines the event triggering interval following the below syntax.
Code Block | ||||
---|---|---|---|---|
| ||||
define trigger <trigger name> at {'start'| every <time interval>| '<cron expression>'}; |
With "'start'" an event will be trigger at Siddhi start.
With "every <time interval>" an event will be triggered periodically on the given time interval.
With "'<cron expression>'" an event will be triggered periodically based on the given cron expression, refer quartz-scheduler for config details.
E.g Trigger an event every 5 minutes.
Code Block | ||||
---|---|---|---|---|
| ||||
define trigger FiveMinTriggerStream at every 5 min; |
E.g Trigger an event at 10:15am every Monday, Tuesday, Wednesday, Thursday and Friday.
Code Block | ||||
---|---|---|---|---|
| ||||
define trigger FiveMinTriggerStream at '0 15 10 ? * MON-FRI'; |
Eval Script
Eval script allows Siddhi to process events using other programming languages by defining functions by them. Eval script functions can be defined like event tables or streams and referred in the queries as Inbuilt Functions of Siddhi.
Eval Script Definition
Eval Script Definition defines the function operation following the below syntax.
Code Block | ||||
---|---|---|---|---|
| ||||
define function <function name>[<language name>] return <return type> {
<operation of the function>
}; |
With "<function name>" a function will be defined to be using in the queries. Note this function will overwrite the Inbuilt Functions.
With "<language name>" the execution language will be defined e.g. JavaScript, R, Scala.
With "<return type>" the return type of the function is defined, it can either be int, long, float, double, string, bool or object. Here the function implementer should be responsible for returning the output on the defined <return type> for proper functionality.
With "<operation of the function>" the execution logic of the function should be written on the defined language defined in <language name> returning <return type>.
Supported Eval Script Languages
- JavaScript
- R
- Scala
JavaScript
E.g Concatenating function in JavaScript.
Code Block | ||||
---|---|---|---|---|
| ||||
define function concatFn[JavaScript] return string {
var str1 = data[0];
var str2 = data[1];
var str3 = data[2];
var responce = str1 + str2 + str3;
return responce;
};
define stream TempStream(deviceID long, roomNo int, temp double);
from TempStream
select concatFn(roomNo,'-',deviceID) as id, temp
insert into DeviceTempStream; |
R
E.g Concatenating function in R.
Code Block | ||||
---|---|---|---|---|
| ||||
define function concatFn[R] return string {
return(paste(data, collapse=""));
};
define stream TempStream(deviceID long, roomNo int, temp double);
from TempStream
select concatFn(roomNo,'-',deviceID) as id, temp
insert into DeviceTempStream; |
Scala
E.g Concatenating function in Scala.
Code Block | ||||
---|---|---|---|---|
| ||||
define function concatFn[Scala] return string { var concatenatedString = for(i <- 0 until data.length){ concatenatedString += |
...
data(i).toString } concatenatedString }; define stream |
...
Info |
---|
You can create the Java project using any method you prefer. The required dependencies can be downloaded from here. |
...
Define a stream definition as follows. The stream definition defines the format of the incoming events.
Code Block | ||
---|---|---|
| ||
String definition = "@config(async = 'true') define stream cseEventStream (symbol string, price float, volume long);"; |
...
Define a Siddhi query as follows.
Code Block | ||
---|---|---|
| ||
String query = "@info(name = 'query1') from cseEventStream#window.timeBatch(500) select symbol, sum(price) as price, sum(volume) as volume group by symbol insert into outputStream ;"; |
This Siddhi query stores incoming events for 500 milliseconds, groups them by symbol and calculates the sum for price and volume. Then it inserts the results into a stream named outputStream
.
Step 2: Creating an execution plan runtime
An execution plan is a self contained, valid set of stream definitions and queries. This step involves creating a runtime representation of an execution plan by combining the stream definition and the Siddhi query you created in Step 1.
Code Block | ||
---|---|---|
| ||
SiddhiManager siddhiManager = new SiddhiManager();
ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(definition + query); |
In the above example, definition + query
forms the execution plan. The Siddhi Manager parses the execution plan and provides you with an execution plan runtime. This execution plan runtime is used to add callbacks and input handlers to the execution plan.
Step 3: Registering a callback
You can register a callback to the execution plan runtime in order to receive the results once the events are processed. There are two types of callbacks.
- Query callback: This subscribes to a query.
- Stream callback: This subscribes to an event stream.
In this example, a query callback is added because the Maven project has only one query.
Code Block | ||
---|---|---|
| ||
executionPlanRuntime.addCallback("query1", new QueryCallback() {
@Override
public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
EventPrinter.print(timeStamp, inEvents, removeEvents);
}
}); |
Here, a new query callback is added to a query named query1
. Once the results are generated, they are sent to the receive method of this callback. An event printer is added inside this callback to print the incoming events for demonstration purposes.
Step 4: Sending events
In order to send events from the event stream to the query, you need to obtain an input handler as follows.
Code Block | ||
---|---|---|
| ||
InputHandler inputHandler = executionPlanRuntime.getInputHandler("cseEventStream"); |
Use the following code to start the execution plan runtime and send events.
Code Block | ||
---|---|---|
| ||
executionPlanRuntime.start();
inputHandler.send(new Object[]{"ABC", 700f, 100l});
inputHandler.send(new Object[]{"WSO2", 60.5f, 200l});
inputHandler.send(new Object[]{"DEF", 700f, 100l});
inputHandler.send(new Object[]{"ABC", 700f, 100l});
inputHandler.send(new Object[]{"WSO2", 60.5f, 200l});
inputHandler.send(new Object[]{"DEF", 700f, 100l});
inputHandler.send(new Object[]{"ABC", 700f, 100l});
inputHandler.send(new Object[]{"WSO2", 60.5f, 200l});
inputHandler.send(new Object[]{"DEF", 700f, 100l});
executionPlanRuntime.shutdown(); |
When the events are sent, they are printed by the event printer.
...
TempStream(deviceID long, roomNo int, temp double);
from TempStream
select concatFn(roomNo,'-',deviceID) as id, temp
insert into DeviceTempStream; |
Siddhi Extensions
Siddhi supports an extension architecture to support custom code and functions to be incorporated with Siddhi in a seamless manner. Extension will follow the following syntax;
Code Block | ||||
---|---|---|---|---|
| ||||
<namespace>:<function name>(<parameter1>, <parameter2>, ... ) |
Here the namespace will allow Siddhi to identify the function as an extension and its extension group, the function name will denote the extension function within the given group, and the parameters will be the inputs that can be passed to the extension for evaluation and/or configuration.
E.g. A window extension created with namespace foo and function name unique can be referred as follows:
Code Block | ||||
---|---|---|---|---|
| ||||
from StockExchangeStream[price >= 20]#window.foo:unique(symbol)
select symbol, price
insert into StockQuote |
Extension Types
Siddhi supports following five type of extensions:
Function Extension
For each event it consumes zero or more parameters and output a single attribute as an output. This could be used to manipulate event attributes to generate new attribute like Function operator. Implemented by extending "org.wso2.siddhi.core.executor.function.FunctionExecutor".
E.g. "math:sin(x)" here the sin function of math extension will return the sin value its parameter x.
Aggregate Function Extension
For each event it consumes zero or more parameters and output a single attribute having an aggregated results based in the input parameters as an output. This could be used with conjunction with a window in order to find the aggregated results based on the given window like Aggregate Function operator. Implemented by extending "org.wso2.siddhi.core.query.selector.attribute.aggregator.AttributeAggregator".
E.g. "custom:std(x)" here the std aggregate function of custom extension will return the standard deviation of value x based on the assigned window to its query.
Window Extension
Allows events to be collected and expired without altering the event format based on the given input parameters like the Window operator. Implemented by extending "org.wso2.siddhi.core.query.processor.stream.window.WindowProcessor".
E.g. "custom:unique(key)" here the unique window of custom extension will return all events as current events upon arrival as current events and when events arrive with the same value based on the "key" parameter the corresponding to a previous event arrived the previously arrived event will be emitted as expired event.
Stream Function Extension
Allows events to be altered by adding one or more attributes to it. Here events could be outputted upon each event arrival. Implemented by extending "org.wso2.siddhi.core.query.processor.stream.function.StreamFunctionProcessor".
E.g. "custom:pol2cart(theta,rho)" here the pol2cart function of custom extension will return all events by calculating the cartesian coordinates x & y and adding them as new attributes to the existing events.
Stream Processor Extension
Allows events to be collected and expired with altering the event format based on the given input parameters. Implemented by extending "oorg.wso2.siddhi.core.query.processor.stream.StreamProcessor".
E.g. "custom:perMinResults(arg1, arg2, ...)" here the perMinResults function of custom extension will return all events by adding one or more attributes the events based on the conversion logic and emitted as current events upon arrival as current events and when at expiration expired events could be emitted appropriate expiring events attribute values for matching the current events attributes counts and types.
Available Extensions
Siddhi currently have several prewritten extensions as follows;
Extensions released under Apache License v2 :
- math : Supporting mathematical operations
- str : Supporting String operations
- geo : Supporting geocode operations
- regex : Supporting regular expression operations
- time : Supporting time expression operations
- ml : Supporting Machine Learning expression operations
- timeseries : Supporting Time Series operations
- kf (Kalman Filter) : Supporting filtering capabilities by detecting outliers of the data.
- map : Supporting to send a map object inside Siddhi stream definitions and use it inside queries.
- reorder : Supporting for reordering events from an unordered event stream using Kslack algorithm.
Extensions released under GNU/GPL License v3 :
- geo : Supporting geographical processing operations
- r : Supporting R executions
- nlp : Supporting Natural Language Processing expression operations
- pmml : Supporting Predictive Model Markup Language expression operations
You can get them from https://github.com/wso2-gpl/siddhi
Writing Custom Extensions
Custom extensions can be written in order to cater usecase specific logics that are not out of the box available in Siddhi or as an extension.
To create custom extensions two things need to be done.
Implementing the extension logic by extending well defined Siddhi interfaces. E.g implementing a UniqueWindowProcessor by extending org.wso2.siddhi.core.query.processor.stream.window.WindowProcessor.
Code Block language java package org.wso2.test; public class UniqueWindowProcessor extends WindowProcessor { ... }
Add an extension mapping file to map the written extension class with the extension function name and namespace. Here extension mapping file should be named as "<namespace>.siddhiext". E.g Mapping the written UniqueWindowProcessor extension with function name "unique" and namespace "foo", to do so the mapping file should be named as foo.siddhiext and the context of the file should as below;
Code Block language text # function name to class mapping of 'foo' extension unique=org.wso2.test.UniqueWindowProcessor
Refer following for implementing different types of Siddhi extensions with examples