Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

This guide provides instructions to use the Siddhi Query Language 3.0 with WSO2 CEP using examples.

...

The execution logic of Siddhi can be composed together as an execution plan, and all the above language constructs can be written as script in an execution plan. Each construct should be separated by a semicolon ( ; ). 

Event

...

stream

An event stream is a sequence of events with a defined schema. One or more event streams can be imported and manipulated using queries in order to identify complex event conditions, and new event streams are created to notify query responses.

A type sequence of events that will have a defined schema, one or more events stream can be consumed and manipulated by queries in order to identify complex event conditions and new event streams could be emitted to notify query responses. 

Event

...

stream definition

The event stream definition defines the event stream schema. An event stream definition contains a unique name and a set of attributes assigned specific types, with uniquely identifiable names within the stream.

...

Inferred Stream: Here the RoomTempStream is an inferred Stream, i.e.  RoomTempStream can be used as an input query for another query with out explicitly defining its Event Stream Definition. Because its Event Stream Definition is inferred from the above query.  

Query

...

projection 

SiddhiQL supports the following for query projection.

...

Functions parameters can be attributes ( int, long, float, double, string, bool, object), results of other functions, results of mathematical or logical expressions or time parameters. 

...

Output

...

event categories

Window output can be manipulated based event categories, i.e. current and expired events, use the following keywords with output stream to manipulate the output. 

...

Code Block
languagesql
linenumberstrue
from TempStream#window.time(1 min)
select *
insert expired events into DelayedTempStream

Aggregate

...

functions

Aggregate functions can be used with windows to perform aggregate calculations within the defined window. 

Inbuilt

...

aggregate

...

functions

Siddhi supports the following inbuilt aggregate functions.

...

Code Block
languagesql
linenumberstrue
from TempStream#window.time(10 min)
select avg(temp) as avgTemp, roomNo, deviceID
insert all events into AvgTempStream;

Group

...

by

Group by allows us to group the aggregation based on group by attributes. 

...

Code Block
languagesql
linenumberstrue
from TempStream#window.time(10 min)
select avg(temp) as avgTemp, roomNo
group by roomNo
having avgTemp > 30
insert into AlertStream;

Output

...

rate limiting

Output rate limiting allows queries to emit events periodically based on the condition specified.

...

Code Block
languagesql
linenumberstrue
from every( e1=TempStream ) -> e2=TempStream[e1.roomNo==roomNo and (e1.temp + 5) <= temp ]
	within 10 min
select e1.roomNo, e1.temp as initialTemp, e2.temp as finalTemp
insert into AlertStream;
Logical

...

pattern 

Pattern not only matches event arriving on the temporal order but it can also correlate events having logical relationships. 

...

Code Block
languagesql
linenumberstrue
define stream TempStream(deviceID long, roomNo int, temp double);

define stream RegulatorStream(deviceID long, roomNo int, tempSet double);
 
from every( e1=RegulatorStream ) -> e2=TempStream[e1.roomNo==roomNo and e1.tempSet <= temp ] or e3=RegulatorStream[e1.roomNo==roomNo]
select e1.roomNo, e2.temp as roomTemp
having e3 is null
insert into AlertStream;
Counting

...

pattern 

Counting pattern enable us to match multiple events based on the same matching condition. The expected number of events can be limited using the following postfix.

...

Code Block
languagesql
linenumberstrue
from every e1=TempStream, e2=TempStream[e1.temp + 1 < temp ]
select e1.temp as initialTemp, e2.temp as finalTemp
insert into AlertStream;
Logical

...

sequence 

Sequence not only matches consecutive event arriving on the temporal order but it can also correlate events having logical relationships. 

...

Code Block
languagesql
linenumberstrue
define stream TempStream(deviceID long, temp double);
define stream HumidStream(deviceID long, humid double);
define stream RegulatorStream(deviceID long, isOn bool);
 
from every e1=RegulatorStream, e2=TempStream and e3=HumidStream
select e2.temp, e3.humid
insert into StateNotificationStream;
Counting

...

sequence 

Counting sequence enable us to match multiple consecutive events based on the same matching condition. The expected number of events can be limited using the following postfix.

...

Partition key can be defined using the categorical (string) attribute of the input event stream as Variable Partition or by defining separate ranges when the partition need to be defined using numeral attributes of the input event stream as Range Partition.

...

Variable partition 

Partition using categorical (string) attributes will adhere to the following syntax.

...

Code Block
languagesql
linenumberstrue
partition with ( deviceID of TempStream )
begin
	from TempStream#window.length(10)
	select roomNo, deviceID, max(temp) as maxTemp
	insert into DeviceTempStream
end;

Range

...

partition 

Partition using numerical attributes will adhere to the following syntax.

...

Code Block
languagesql
linenumberstrue
partition with ( roomNo>=1030 as 'serverRoom' or roomNo<1030 and roomNo>=330 as 'officeRoom' or roomNo<330 as 'lobby' of TempStream) )
begin
	from TempStream#window.time(10 min)
	select roomNo, deviceID, avg(temp) as avgTemp 
	insert into AreaTempStream
end;

Inner

...

streams 

Inner streams can be used for query instances of a partition to communicate between other query instances of the same partition. Inner Streams are denoted by a "#" in front of them, and these streams cannot be accessed outside of the partition block. 

...

Event table allows Siddhi to work with stored events, and this can be viewed as a stored version of Event Stream or a table of events. By default events will be stored in-memory and Siddhi also provides an extension to work with data/events stored in RDBMS data stores.

Event

...

table definition

Event Table Definition defines the event table schema. An Event Table Definition contains a unique name and a set of typed attributes with uniquely identifiable names within the table. 

...

Code Block
languagesql
linenumberstrue
define table RoomTypeTable (roomNo int, type string);
Indexing

...

event table

Event table can be indexed for fast event access using the "IndexBy" annotation. With "IndexBy" only one attribute can be indexed, and when indexed it uses a map data structure to hold the events. Therefore if multiple events are inserted to the event table having the same index value only last inserted event will remain in the table.

...

Code Block
languagesql
linenumberstrue
@IndexBy('roomNo')
define table RoomTypeTable (roomNo int, type string);
Hazelcast

...

event table

Event tables also support persisting event data in a distributed manner using Hazelcast in-memory data grids. This functionality can easily be enabled using the "From" annotation. Also, the connection instructions for Hazelcast cluster can be assigned to the event table using "From" annotation.

Info

Connection instructions that can be used with "From" annotation: 

cluster.name : Hazelcast cluster/group name [Optional]  (i.e cluster.name='cluster_a').

cluster.password : Hazelcast cluster/group password [Optional]  (i.e cluster.password='pass@cluster_a').

cluster.addresses : Hazelcast cluster addresses (ip:port) as a comma separated string [Optional, client mode only] (i.e cluster.addresses='192.168.1.1:5700,192.168.1.2:5700').

well.known.addresses : Hazelcast WKAs (ip) as a comma separated string [Optional, server mode only] (i.e well.known.addresses='192.168.1.1,192.168.1.2').

collection.name : Hazelcast collection object name [Optional, can be used to share single table between multiple EPs] (i.e collection.name='stockTable').

...

 

Code Block
languagesql
linenumberstrue
@from(eventtable = 'hazelcast')
define table RoomTypeTable(roomNo int, type string);
E.g. 2. Create an event table with name RoomTypeTable having attributes room number as int & type as string, backed by a new Hazelcast Instance in a new Hazelcast Cluster.
Code Block
languagesql
linenumberstrue
@from(eventtable = 'hazelcast', cluster.name = 'cluster_a', cluster.password = 'pass@cluster_a')
define table RoomTypeTable(roomNo int, type string);
E.g. 3. Create an event table with name RoomTypeTable having attributes room number as int & type as string, backed by a existing Hazelcast Instance in an existing Hazelcast Cluster.
Code Block
languagesql
linenumberstrue
@from(eventtable = 'hazelcast', cluster.name = 'cluster_a', cluster.password = 'pass@cluster_a', cluster.addresses='192.168.1.1:5700,192.168.1.2.5700')
define table RoomTypeTable(roomNo int, type string);

 

RDBMS

...

event table

Event table can be backed with an RDBMS event store using the ''From" annotation. With "From" the data source or the connection instructions can be assigned to the event table. The RDBMS table name can be different from the event table name defined in Siddhi, and Siddhi will always refer to the defined event table name. However the defined event table name cannot be same as an already existing stream name, since syntactically both are considered the same with in the Siddhi query language.

...

Caching events with RDBMS Event Tableevent table

Several caches can be used with RDBMS backed event tables in order to reduce I/O operations and improve their performance. Currently all cache implementations provides size-based algorithms. Caches can be added using the "cache" element and the size of the cache can be defined using the "cache.size" element of the "From" annotation. 

...

Code Block
languagesql
linenumberstrue
@From(eventtable='rdbms', datasource.name='AnalyticsDataSource', table.name='RoomTable', cache='LRU', cache.size='3000')
define table RoomTypeTable (roomNo int, type string);

Using Bloom

...

filters

A Bloom Filter is an algorithm or an approach that can be used to perform quick searches. If you apply a Bloom Filter to a data set and carry out an isAvailable check on that specific Bloom Filter instance, an accurate answer is returned if the search item is not available. This allows the quick improvement of updates, joins and isAvailable checks.

...

Code Block
languagesql
define stream TempStream(deviceID long, roomNo int, temp double);
define stream RegulatorStream(deviceID long, roomNo int, isOn bool);
define window TempWindow(deviceID long, roomNo int, temp double) time(1 min);

from TempStream[temp > 30.0]
insert into TempWindow;

from TempWindow
join RegulatorStream[isOn == false]#window.length(1) as R
on TempWindow.roomNo == R.roomNo
select TempWindow.roomNo, R.deviceID, 'start' as action
insert into RegulatorActionStream;

Event

...

trigger

Triggers allow us to create events periodically based on time and at Siddhi start. Event trigger will generate events on event stream with name same as the event trigger having only one attribute with name "triggered_time" and type long. 

Event

...

trigger definition

Event Trigger Definition defines the event triggering interval following the below syntax.

...

Code Block
languagesql
linenumberstrue
define trigger FiveMinTriggerStream at '0 15 10 ? * MON-FRI';

Eval Script

...

Siddhi logger

The Siddhi logger is used to log events that arrive in different logger priorities such as INFODEBUGWARNFATALERROROFF, and TRACE.

The following syntax is used.

Code Block
languagesql
<void> log(<string> priority, <string> logMessage, <bool> isEventLogged)

The parameters used in the query are as follows.

ParameterDescription

priority

The logging priority. Possible values are INFODEBUGWARNFATALERROROFF, and TRACE. If no value is specified for this parameter, INFO is printed as the priority by default.
logMessageThis parameter allows you to specify a message to be printed in the log.
isEventLoggedThis parameter specifies whether the event body should be included in the log. Possible values are true and false. If no value is specified, the event body is not printed in the log by default.

The following examples illustrate the variations of the Siddhi logger.

e.g.,

  • The following query logs the event with the INFO logging priority. This is because the priority is not specified.

    Code Block
    languagesql
    from StockStream#log()
    select * 
    insert into OutStream;
  • The following query logs the event with the INFO logging priority (because the priority is not specified) and the test message text.

    Code Block
    languagesql
    from StockStream#log('test message')
    select * 
    insert into OutStream;
  • The following query logs the event with the INFO logging priority because a priority is not specified. The event itself is printed in the log.

    Code Block
    languagesql
    from StockStream#log(true)
    select * 
    insert into OutStream;
  • The following query logs the event with the INFO logging priority (because the priority is not specified) and the test message text. The event itself is printed in the log.

    Code Block
    languagesql
    from StockStream#log('test message', true)
    select * 
    insert into OutStream;
  • The following query logs the event with the WARN logging priority and the test message text.

    Code Block
    languagesql
    from StockStream#log('warn','test message')
    select * 
    insert into OutStream;
  • The following query logs the event with the WARN logging priority and the test message text.  The event itself is printed in the log.

    Code Block
    languagesql
    from StockStream#log('warn','test message',true)
    select * 
    insert into OutStream;

Eval script

Eval script allows Siddhi to process events using other programming languages by defining functions by them. Eval script functions can be defined like event tables or streams and referred in the queries as Inbuilt Functions of Siddhi.

Eval

...

script definition

Eval Script Definition defines the function operation following the below syntax.

...

Code Block
languagesql
linenumberstrue
define function concatFn[Scala] return string {
    var concatenatedString =
     for(i <- 0 until data.length){
         concatenatedString += data(i).toString
     }
     concatenatedString
};

define stream TempStream(deviceID long, roomNo int, temp double);

from TempStream 
select concatFn(roomNo,'-',deviceID) as id, temp  
insert into DeviceTempStream;

Siddhi

...

extensions

Siddhi supports an extension architecture to support custom code and functions to be incorporated with Siddhi in a seamless manner. Extension will follow the following syntax;

...

Code Block
languagesql
linenumberstrue
from StockExchangeStream[price >= 20]#window.foo:unique(symbol)
select symbol, price
insert into StockQuote 

Extension

...

types 

Siddhi supports following five type of extensions: 

...

  • math: Supporting mathematical operations 
  • str: Supporting String operations 
  • geo: Supporting geocode operations
  • regex: Supporting regular expression operations
  • time: Supporting time expression operations
  • ml: Supporting Machine Learning expression operations
  • timeseries: Supporting Time Series operations
  • kf (Kalman Filter): Supporting filtering capabilities by detecting outliers of the data.
  • map: Supporting to send a map object inside Siddhi stream definitions and use it inside queries.
  • reorder: Supporting for reordering events from an unordered event stream using Kslack algorithm.

...

  • geo: Supporting geographical processing operations   
  • r:  Supporting R executions
  • nlp: Supporting Natural Language Processing expression operations
  • pmml: Supporting Predictive Model Markup Language expression operations

You can get them from https://github.com/wso2-gpl/siddhi

...