Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

A Siddhi Application is a combination of multiple Siddhi executional elements. A Siddhi executional element can be a Siddhi Query or a Siddhi Partition. When defining a Siddhi application, you can specify a number of parallel instances to be created for each executional element, and how each executional element must be isolated for an SP instance. Based on this, the initial Siddhi application is divided into multiple Siddhi applications and deployed in different SP instances.

...

AnnotationDescription
@dist(execGroup='name of the group')

All the executional elements with the same execution group are executed in the same Siddhi application. When different execution groups are mentioned within the same distributed Siddhi application, WSO2 SP initiates a separate Siddhi Application per execution group. In each separated Siddhi application, only the executional elements assigned to the relevant execution group are executed.

Executional elements that have no execution group assigned to them are executed in a separate SP instance.

@dist (parallel='number of parallel instances’)

The number of instances in which the executional element must be executed in parallel. All the executional elements assigned to a specific execution group (i.e., via the @dist(execGroup) annotation) must have the same number of parallel instances specified. If there is a mismatch in the parallel instances specified for an execution group, an exception occurs.

When the number of parallel instances to be run is not given for the executional elements assigned to an execution group , only one Siddhi application is initiated for that execution group.

This can also be applied to sources. If a parallelism count is specified within a source annotation, a number of passthrough Siddhi applications equal to that count are generated and deployed.

Example

The following is a sample distributed Siddhi application.

Code Block
languagesql
@info(name = ‘query1') @dist(execGroup='group1')
from TempStream#window.time(2 min) 
select avg(temp) as avgTemp, roomNo, deviceID insert all events into AvgTempStream@App:name('Energy-Alert-App')
@App:description('Energy consumption and anomaly detection')

@source(type = 'http',  topic = 'device-power', @map(type = 'json'), @dist(parallel='2'))
define stream DevicePowerStream (type string, deviceID string, power int, roomID string);

@sink(type = 'email', to = '{{autorityContactEmail}}', username = 'john', address = 'john@gmail.com', password = 'test', subject = 'High power consumption of {{deviceID}}', @map(type = 'text', @payload('Device ID: {{deviceID}} of room : {{roomID}} power is consuming {{finalPower}}kW/h. ')))
define stream AlertStream (deviceID string, roomID string, initialPower double, finalPower double, autorityContactEmail string);

@info(name = ‘query2'monitered-filter') @dist(execGroup='group1001')
from TempStreamDevicePowerStream[temptype > 30.0]#window.time(1 min) as T join 
RegulatorStream[isOn ==false]#window.length(1) as R on T.roomNo == R.roomNo
select T.roomNo, R.deviceID, 'start' as action insert into RegulatorActionStream== 'monitored'] 
select deviceID, power, roomID 
insert current events into MonitoredDevicesPowerStream;

@info(name = 'power-increase-pattern')@dist(parallel='2', execGroup='002')
partition with (deviceID of MonitoredDevicesPowerStream)
begin
@info(name = 'avg-calculator')
from MonitoredDevicesPowerStream#window.time(2 min) 
select deviceID, avg(power) as avgPower, roomID 
insert current events into #AvgPowerStream;

@info(name = ‘query3') @dist(execGroup='group1''power-increase-detector')
from every( e1 =TempStream )#AvgPowerStream -> e2 =TempStream[e1.roomNo==roomNo and  #AvgPowerStream[(e1.tempavgPower + 5) <= temp avgPower] within 10 min 
select e1.roomNodeviceID as deviceID, e1.tempavgPower as initialTempinitialPower, e2.tempavgPower as finalPower, e1.roomID finalTemp
insert current events into AlertStreamRisingPowerStream;
end;


@info(name = ‘query4'power-range-filter') @dist(execGroupparallel='group22' ,parallel execGroup='2003')
from TempStream RisingPowerStream[(roomNofinalPower >= 100] and
roomNoselect < 110) and temp > 40 ] select roomNo, temp insert into HighTempStream;

deviceID, roomID, initialPower, finalPower, 'no-reply@powermanagement.com' as autorityContactEmail 
insert current events into AlertStream; 

@info(name = ‘query5'internal-filter') @dist(execGroup='group3' , parallel=’2’)
partition with ( deviceID of TempStream )
begin
    from TempStream#window.time(1 min)
    select roomNo, deviceID, temp, avg(temp) as avgTemp
    insert into #AvgTempStream;

    from #AvgTempStream[avgTemp > 20]#window.length(10)
    select roomNo, deviceID, max(temp) as maxTemp
    insert into deviceTempStream;
end;

...

004')
from DevicePowerStream[type == 'internal'] 
select deviceID, power 
insert current events into InternaltDevicesPowerStream;

When above siddhi application is deployed it creates a distributed processing chain as depicted in the image below.

Image Added

As annotated in the Siddhi application, two passthough query groups are created to accept HTTP traffic and to send those events into the messaging layer. Other execution groups are created as per the given parallelism count. The execution group creation is summarized in the table below.

Execution Group

Number of Siddhi

Application Instances

Queries executed

group1001

1
monitered-filter

query1002

query2

query3

group22
power-increase-pattern

003

2

query4

group3

2query5
power-range-filter
0041
internal-filter