This guide provides instructions to use the Siddhi Query Language 3.0 with WSO2 CEP using examples.
...
The execution logic of Siddhi can be composed together as an execution plan, and all the above language constructs can be written as script in an execution plan. Each construct should be separated by a semicolon ( ;
).
Event
...
stream
An event stream is a sequence of events with a defined schema. One or more event streams can be imported and manipulated using queries in order to identify complex event conditions, and new event streams are created to notify query responses.
A type sequence of events that will have a defined schema, one or more events stream can be consumed and manipulated by queries in order to identify complex event conditions and new event streams could be emitted to notify query responses.
Event
...
stream definition
The event stream definition defines the event stream schema. An event stream definition contains a unique name and a set of attributes assigned specific types, with uniquely identifiable names within the stream.
...
Inferred Stream: Here the RoomTempStream is an inferred Stream, i.e. RoomTempStream can be used as an input query for another query with out explicitly defining its Event Stream Definition. Because its Event Stream Definition is inferred from the above query.
Query
...
projection
SiddhiQL supports the following for query projection.
...
Functions parameters can be attributes ( int
, long, float, double, string, bool, object
), results of other functions, results of mathematical or logical expressions or time parameters.
...
Output
...
event categories
Window output can be manipulated based event categories, i.e. current and expired events, use the following keywords with output stream to manipulate the output.
...
Code Block | ||||
---|---|---|---|---|
| ||||
from TempStream#window.time(1 min) select * insert expired events into DelayedTempStream |
Aggregate
...
functions
Aggregate functions can be used with windows to perform aggregate calculations within the defined window.
Inbuilt
...
aggregate
...
functions
Siddhi supports the following inbuilt aggregate functions.
...
Code Block | ||||
---|---|---|---|---|
| ||||
from TempStream#window.time(10 min) select avg(temp) as avgTemp, roomNo, deviceID insert all events into AvgTempStream; |
Group
...
by
Group by allows us to group the aggregation based on group by attributes.
...
Code Block | ||||
---|---|---|---|---|
| ||||
from TempStream#window.time(10 min) select avg(temp) as avgTemp, roomNo group by roomNo having avgTemp > 30 insert into AlertStream; |
Output
...
rate limiting
Output rate limiting allows queries to emit events periodically based on the condition specified.
...
Code Block | ||||
---|---|---|---|---|
| ||||
from every( e1=TempStream ) -> e2=TempStream[e1.roomNo==roomNo and (e1.temp + 5) <= temp ] within 10 min select e1.roomNo, e1.temp as initialTemp, e2.temp as finalTemp insert into AlertStream; |
Logical
...
pattern
Pattern not only matches event arriving on the temporal order but it can also correlate events having logical relationships.
...
Code Block | ||||
---|---|---|---|---|
| ||||
define stream TempStream(deviceID long, roomNo int, temp double); define stream RegulatorStream(deviceID long, roomNo int, tempSet double); from every( e1=RegulatorStream ) -> e2=TempStream[e1.roomNo==roomNo and e1.tempSet <= temp ] or e3=RegulatorStream[e1.roomNo==roomNo] select e1.roomNo, e2.temp as roomTemp having e3 is null insert into AlertStream; |
Counting
...
pattern
Counting pattern enable us to match multiple events based on the same matching condition. The expected number of events can be limited using the following postfix.
...
Code Block | ||||
---|---|---|---|---|
| ||||
from every e1=TempStream, e2=TempStream[e1.temp + 1 < temp ] select e1.temp as initialTemp, e2.temp as finalTemp insert into AlertStream; |
Logical
...
sequence
Sequence not only matches consecutive event arriving on the temporal order but it can also correlate events having logical relationships.
...
Code Block | ||||
---|---|---|---|---|
| ||||
define stream TempStream(deviceID long, temp double); define stream HumidStream(deviceID long, humid double); define stream RegulatorStream(deviceID long, isOn bool); from every e1=RegulatorStream, e2=TempStream and e3=HumidStream select e2.temp, e3.humid insert into StateNotificationStream; |
Counting
...
sequence
Counting sequence enable us to match multiple consecutive events based on the same matching condition. The expected number of events can be limited using the following postfix.
...
Partition key can be defined using the categorical (string) attribute of the input event stream as Variable Partition or by defining separate ranges when the partition need to be defined using numeral attributes of the input event stream as Range Partition.
...
Variable partition
Partition using categorical (string) attributes will adhere to the following syntax.
...
Code Block | ||||
---|---|---|---|---|
| ||||
partition with ( deviceID of TempStream ) begin from TempStream#window.length(10) select roomNo, deviceID, max(temp) as maxTemp insert into DeviceTempStream end; |
Range
...
partition
Partition using numerical attributes will adhere to the following syntax.
...
Code Block | ||||
---|---|---|---|---|
| ||||
partition with ( roomNo>=1030 as 'serverRoom' or roomNo<1030 and roomNo>=330 as 'officeRoom' or roomNo<330 as 'lobby' of TempStream) ) begin from TempStream#window.time(10 min) select roomNo, deviceID, avg(temp) as avgTemp insert into AreaTempStream end; |
Inner
...
streams
Inner streams can be used for query instances of a partition to communicate between other query instances of the same partition. Inner Streams are denoted by a "#" in front of them, and these streams cannot be accessed outside of the partition block.
...
Event table allows Siddhi to work with stored events, and this can be viewed as a stored version of Event Stream or a table of events. By default events will be stored in-memory and Siddhi also provides an extension to work with data/events stored in RDBMS data stores.
Event
...
table definition
Event Table Definition defines the event table schema. An Event Table Definition contains a unique name and a set of typed attributes with uniquely identifiable names within the table.
...
Code Block | ||||
---|---|---|---|---|
| ||||
define table RoomTypeTable (roomNo int, type string); |
Indexing
...
event table
Event table can be indexed for fast event access using the "IndexBy" annotation. With "IndexBy" only one attribute can be indexed, and when indexed it uses a map data structure to hold the events. Therefore if multiple events are inserted to the event table having the same index value only last inserted event will remain in the table.
...
Code Block | ||||
---|---|---|---|---|
| ||||
@IndexBy('roomNo') define table RoomTypeTable (roomNo int, type string); |
Hazelcast
...
event table
Event tables also support persisting event data in a distributed manner using Hazelcast in-memory data grids. This functionality can easily be enabled using the "From" annotation. Also, the connection instructions for Hazelcast cluster can be assigned to the event table using "From" annotation.
Info |
---|
Connection instructions that can be used with "From" annotation: cluster.name : Hazelcast cluster/group name [Optional] (i.e cluster.name='cluster_a').
|
...
Code Block | ||||
---|---|---|---|---|
| ||||
@from(eventtable = 'hazelcast') define table RoomTypeTable(roomNo int, type string); |
Code Block | ||||
---|---|---|---|---|
| ||||
@from(eventtable = 'hazelcast', cluster.name = 'cluster_a', cluster.password = 'pass@cluster_a') define table RoomTypeTable(roomNo int, type string); |
Code Block | ||||
---|---|---|---|---|
| ||||
@from(eventtable = 'hazelcast', cluster.name = 'cluster_a', cluster.password = 'pass@cluster_a', cluster.addresses='192.168.1.1:5700,192.168.1.2.5700') define table RoomTypeTable(roomNo int, type string); |
RDBMS
...
event table
Event table can be backed with an RDBMS event store using the ''From" annotation. With "From" the data source or the connection instructions can be assigned to the event table. The RDBMS table name can be different from the event table name defined in Siddhi, and Siddhi will always refer to the defined event table name. However the defined event table name cannot be same as an already existing stream name, since syntactically both are considered the same with in the Siddhi query language.
...
Caching events with RDBMS Event Tableevent table
Several caches can be used with RDBMS backed event tables in order to reduce I/O operations and improve their performance. Currently all cache implementations provides size-based algorithms. Caches can be added using the "cache" element and the size of the cache can be defined using the "cache.size" element of the "From" annotation.
...
Code Block | ||||
---|---|---|---|---|
| ||||
@From(eventtable='rdbms', datasource.name='AnalyticsDataSource', table.name='RoomTable', cache='LRU', cache.size='3000') define table RoomTypeTable (roomNo int, type string); |
Using Bloom
...
filters
A Bloom Filter is an algorithm or an approach that can be used to perform quick searches. If you apply a Bloom Filter to a data set and carry out an isAvailable
check on that specific Bloom Filter instance, an accurate answer is returned if the search item is not available. This allows the quick improvement of updates, joins and isAvailable
checks.
...
Code Block | ||
---|---|---|
| ||
define stream TempStream(deviceID long, roomNo int, temp double); define stream RegulatorStream(deviceID long, roomNo int, isOn bool); define window TempWindow(deviceID long, roomNo int, temp double) time(1 min); from TempStream[temp > 30.0] insert into TempWindow; from TempWindow join RegulatorStream[isOn == false]#window.length(1) as R on TempWindow.roomNo == R.roomNo select TempWindow.roomNo, R.deviceID, 'start' as action insert into RegulatorActionStream; |
Event
...
trigger
Triggers allow us to create events periodically based on time and at Siddhi start. Event trigger will generate events on event stream with name same as the event trigger having only one attribute with name "triggered_time" and type long.
Event
...
trigger definition
Event Trigger Definition defines the event triggering interval following the below syntax.
...
Code Block | ||||
---|---|---|---|---|
| ||||
define trigger FiveMinTriggerStream at '0 15 10 ? * MON-FRI'; |
Eval Script
...
Siddhi logger
The Siddhi logger is used to log events that arrive in different logger priorities such as INFO
, DEBUG
, WARN
, FATAL
, ERROR
, OFF
, and TRACE
.
The following syntax is used.
Code Block | ||
---|---|---|
| ||
<void> log(<string> priority, <string> logMessage, <bool> isEventLogged) |
The parameters used in the query are as follows.
Parameter | Description |
---|---|
| The logging priority. Possible values are INFO , DEBUG , WARN , FATAL , ERROR , OFF , and TRACE . If no value is specified for this parameter, INFO is printed as the priority by default. |
logMessage | This parameter allows you to specify a message to be printed in the log. |
isEventLogged | This parameter specifies whether the event body should be included in the log. Possible values are true and false . If no value is specified, the event body is not printed in the log by default. |
The following examples illustrate the variations of the Siddhi logger.
e.g.,
The following query logs the event with the
INFO
logging priority. This is because the priority is not specified.Code Block language sql from StockStream#log() select * insert into OutStream;
The following query logs the event with the
INFO
logging priority (because the priority is not specified) and thetest message
text.Code Block language sql from StockStream#log('test message') select * insert into OutStream;
The following query logs the event with the
INFO
logging priority because a priority is not specified. The event itself is printed in the log.Code Block language sql from StockStream#log(true) select * insert into OutStream;
The following query logs the event with the
INFO
logging priority (because the priority is not specified) and thetest message
text. The event itself is printed in the log.Code Block language sql from StockStream#log('test message', true) select * insert into OutStream;
The following query logs the event with the WARN logging priority and the
test message
text.Code Block language sql from StockStream#log('warn','test message') select * insert into OutStream;
The following query logs the event with the WARN logging priority and the
test message
text. The event itself is printed in the log.Code Block language sql from StockStream#log('warn','test message',true) select * insert into OutStream;
Eval script
Eval script allows Siddhi to process events using other programming languages by defining functions by them. Eval script functions can be defined like event tables or streams and referred in the queries as Inbuilt Functions of Siddhi.
Eval
...
script definition
Eval Script Definition defines the function operation following the below syntax.
...
Code Block | ||||
---|---|---|---|---|
| ||||
define function concatFn[Scala] return string { var concatenatedString = for(i <- 0 until data.length){ concatenatedString += data(i).toString } concatenatedString }; define stream TempStream(deviceID long, roomNo int, temp double); from TempStream select concatFn(roomNo,'-',deviceID) as id, temp insert into DeviceTempStream; |
Siddhi
...
extensions
Siddhi supports an extension architecture to support custom code and functions to be incorporated with Siddhi in a seamless manner. Extension will follow the following syntax;
...
Code Block | ||||
---|---|---|---|---|
| ||||
from StockExchangeStream[price >= 20]#window.foo:unique(symbol) select symbol, price insert into StockQuote |
Extension
...
types
Siddhi supports following five type of extensions:
...
- math: Supporting mathematical operations
- str: Supporting String operations
- geo: Supporting geocode operations
- regex: Supporting regular expression operations
- time: Supporting time expression operations
- ml: Supporting Machine Learning expression operations
- timeseries: Supporting Time Series operations
- kf (Kalman Filter): Supporting filtering capabilities by detecting outliers of the data.
- map: Supporting to send a map object inside Siddhi stream definitions and use it inside queries.
- reorder: Supporting for reordering events from an unordered event stream using Kslack algorithm.
...
- geo: Supporting geographical processing operations
- r: Supporting R executions
- nlp: Supporting Natural Language Processing expression operations
- pmml: Supporting Predictive Model Markup Language expression operations
You can get them from https://github.com/wso2-gpl/siddhi
...