...
Code Block | ||
---|---|---|
| ||
@App:name('Energy-Alert-App')
@App:description('Energy consumption and anomaly detection')
@source(type = 'http', topic = 'device-power', @map(type = 'json'), @dist(parallel='2'))
define stream DevicePowerStream (type string, deviceID string, power int, roomID string);
@sink(type = 'email', to = '{{autorityContactEmail}}', username = 'john', address = 'john@gmail.com', password = 'test', subject = 'High power consumption of {{deviceID}}', @map(type = 'text', @payload('Device ID: {{deviceID}} of room : {{roomID}} power is consuming {{finalPower}}kW/h. ')))
define stream AlertStream (deviceID string, roomID string, initialPower double, finalPower double, autorityContactEmail string);
@info(name = 'monitered-filter')@dist(execGroup='001')
from DevicePowerStream[type == 'monitored']
select deviceID, power, roomID
insert current events into MonitoredDevicesPowerStream;
@info(name = 'power-increase-pattern')@dist(parallel='2', execGroup='002')
partition with (deviceID of MonitoredDevicesPowerStream)
begin
@info(name = 'avg-calculator')
from MonitoredDevicesPowerStream#window.time(2 min)
select deviceID, avg(power) as avgPower, roomID
insert current events into #AvgPowerStream;
@info(name = 'power-increase-detector')
from every e1 = #AvgPowerStream -> e2 = #AvgPowerStream[(e1.avgPower + 5) <= avgPower] within 10 min
select e1.deviceID as deviceID, e1.avgPower as initialPower, e2.avgPower as finalPower, e1.roomID
insert current events into RisingPowerStream;
end;
@info(name = 'power-range-filter')@dist(parallel='2', execGroup='003')
from RisingPowerStream[finalPower > 100]
select deviceID, roomID, initialPower, finalPower, 'no-reply@powermanagement.com' as autorityContactEmail
insert current events into AlertStream;
@info(name = 'internal-filter')@dist(execGroup='004')
from DevicePowerStream[type == 'internal']
select deviceID, power
insert current events into InternaltDevicesPowerStream; |
When above siddhi app is deployed it will create a distributed processing chain as depicted in below figure.
A As annotated in the Siddhi app two passthough query group groups will be created to accept HTTP traffic and to send those events into messaging layer. Other execution groups will be created as per given parallelism count. Below table summarize execution group creation.
...