A Siddhi Application is a combination of multiple Siddhi executional elements. A Siddhi executional element can be a Siddhi Query or a Siddhi Partition. When defining a Siddhi application, you can specify a number of parallel instances to be created for each executional element, and how each executional element must be isolated for an SP instance. Based on this, the initial Siddhi application is divided into multiple Siddhi applications and deployed in different SP instances.
...
Code Block | ||
---|---|---|
| ||
@App:name('Energy-Alert-App')
@App:description('Energy consumption and anomaly detection')
@source(type = 'http', topic = 'device-power', @map(type = 'json'), @dist(parallel='2'))
define stream DevicePowerStream (type string, deviceID string, power int, roomID string);
@sink(type = 'email', to = '{{autorityContactEmail}}', username = 'john', address = 'john@gmail.com', password = 'test', subject = 'High power consumption of {{deviceID}}', @map(type = 'text', @payload('Device ID: {{deviceID}} of room : {{roomID}} power is consuming {{finalPower}}kW/h. ')))
define stream AlertStream (deviceID string, roomID string, initialPower double, finalPower double, autorityContactEmail string);
@info(name = 'monitered-filter')@dist(execGroup='001')
from DevicePowerStream[type == 'monitored']
select deviceID, power, roomID
insert current events into MonitoredDevicesPowerStream;
@info(name = 'power-increase-pattern')@dist(parallel='2', execGroup='002')
partition with (deviceID of MonitoredDevicesPowerStream)
begin
@info(name = 'avg-calculator')
from MonitoredDevicesPowerStream#window.time(2 min)
select deviceID, avg(power) as avgPower, roomID
insert current events into #AvgPowerStream;
@info(name = 'power-increase-detector')
from every e1 = #AvgPowerStream -> e2 = #AvgPowerStream[(e1.avgPower + 5) <= avgPower] within 10 min
select e1.deviceID as deviceID, e1.avgPower as initialPower, e2.avgPower as finalPower, e1.roomID
insert current events into RisingPowerStream;
end;
@info(name = 'power-range-filter')@dist(parallel='2', execGroup='003')
from RisingPowerStream[finalPower > 100]
select deviceID, roomID, initialPower, finalPower, 'no-reply@powermanagement.com' as autorityContactEmail
insert current events into AlertStream;
@info(name = 'internal-filter')@dist(execGroup='004')
from DevicePowerStream[type == 'internal']
select deviceID, power
insert current events into InternaltDevicesPowerStream; |
When above siddhi app application is deployed it will create creates a distributed processing chain as depicted in the image below figure.
A passthough query group will be
As annotated in the Siddhi application, two passthough query groups are created to accept HTTP traffic and to send those events into the messaging layer. Other execution groups will be are created as per the given parallelism count. Below table summarize The execution group creation is summarized in the table below.
Execution Group | Number of Siddhi Application Instances | Queries executed |
---|---|---|
| 1 | monitered-filter |
| 2 | power-increase-pattern |
| 2 | power-range-filter |
004 | 1 | internal-filter |