...
Annotation | Description |
---|---|
@dist(execGroup='name of the group') | All the executional elements with the same execution group are executed in the same Siddhi application. When different execution groups are mentioned within the same distributed Siddhi application, WSO2 SP initiates a separate Siddhi Application per execution group. In each separated Siddhi application, only the executional elements assigned to the relevant execution group are executed. Executional elements that have no execution group assigned to them are executed in a separate SP instance. |
@dist (parallel='number of parallel instances’) | The number of instances in which the executional element must be executed in parallel. All the executional elements assigned to a specific execution group (i.e., via the When the number of parallel instances to be run is not given for the executional elements assigned to an execution group , only one Siddhi application is initiated for that execution group. This can also be applied to sources. When sources are provided with parallelism count passthough Siddhi apps equal to the parallel count will be generated and deployed. |
Example
The following is a sample distributed Siddhi application.
Code Block | ||
---|---|---|
| ||
@info(name = ‘query1') @dist(execGroup='group1') from TempStream#window.time(2 min) select avg(temp) as avgTemp, roomNo, deviceID insert all events into AvgTempStream; @info(name = ‘query2') @dist(execGroup='group1') from TempStream[temp > 30.0]#window.time(1 min) as T join RegulatorStream[isOn ==false]#window.length(1) as R on T.roomNo == R.roomNo select T.roomNo, R.deviceID, 'start' as action insert into RegulatorActionStream@App:name('Energy-Alert-App') @App:description('Energy consumption and anomaly detection') @source(type = 'http', topic = 'device-power', @map(type = 'json')) define stream DevicePowerStream (type string, deviceID string, power int, roomID string); @sink(type = 'email', to = '{{autorityContactEmail}}', username = 'john', address = 'john@gmail.com', password = 'test', subject = 'High power consumption of {{deviceID}}', @map(type = 'text', @payload('Device ID: {{deviceID}} of room : {{roomID}} power is consuming {{finalPower}}kW/h. '))) define stream AlertStream (deviceID string, roomID string, initialPower double, finalPower double, autorityContactEmail string); @info(name = ‘query3'monitered-filter') @dist(execGroup='group1001') from every( e1=TempStream ) -> e2=TempStream[e1.roomNo==roomNo and (e1.temp + 5) <= temp ] within 10 min select e1.roomNo, e1.temp as initialTemp, e2.temp as finalTemp insert into AlertStreamDevicePowerStream[type == 'monitored'] select deviceID, power, roomID insert current events into MonitoredDevicesPowerStream; @info(name = ‘query4'power-increase-pattern') @dist(execGroupparallel='group22' ,parallel execGroup='2002') frompartition TempStreamwith [(roomNodeviceID >=of 100MonitoredDevicesPowerStream) andbegin roomNo@info(name < 110) and temp > 40 ] select roomNo, temp insert into HighTempStream= 'avg-calculator') from MonitoredDevicesPowerStream#window.time(2 min) select deviceID, avg(power) as avgPower, roomID insert current events into #AvgPowerStream; @info(name = ‘query5'power-increase-detector') @dist(execGroup='group3' , parallel=’2’) partition with ( deviceID of TempStream ) begin from TempStream#window.time(1 min) select roomNo, deviceID, temp, avg(temp) as avgTemp insert into #AvgTempStream; from #AvgTempStream[avgTemp > 20]#window.length(10) select roomNo, deviceID, max(temp) as maxTemp insert into deviceTempStream; end; |
When this Siddhi application is deployed, it is executed as shown in the table below.
from every e1 = #AvgPowerStream -> e2 = #AvgPowerStream[(e1.avgPower + 5) <= avgPower] within 10 min
select e1.deviceID as deviceID, e1.avgPower as initialPower, e2.avgPower as finalPower, e1.roomID
insert current events into RisingPowerStream;
end;
@info(name = 'power-range-filter')@dist(parallel='2', execGroup='003')
from RisingPowerStream[finalPower > 100]
select deviceID, roomID, initialPower, finalPower, 'no-reply@powermanagement.com' as autorityContactEmail
insert current events into AlertStream;
@info(name = 'internal-filter')@dist(execGroup='004')
from DevicePowerStream[type == 'internal']
select deviceID, power
insert current events into InternaltDevicesPowerStream; |
When above siddhi app is deployed it will create a distributed processing chain as depicted in below figure.
A passthough query group will be created to accept HTTP traffic and to send those events into messaging layer. Other execution groups will be created as per given parallelism count. Below table summarize execution group creation.
Execution Group | Number of Siddhi Application Instances | Queries executed | |||
---|---|---|---|---|---|
| 1 | monitered-filter | |||
| group2 2 | power-increase-pattern | |||
| 2 |
|
| 2 | query5 power-range-filter |
004 | 1 | internal-filter |