Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: updated the diagram with parallel receivers

...

Code Block
languagesql
@App:name('Energy-Alert-App')
@App:description('Energy consumption and anomaly detection')

@source(type = 'http',  topic = 'device-power', @map(type = 'json'), @dist(parallel='2'))
define stream DevicePowerStream (type string, deviceID string, power int, roomID string);

@sink(type = 'email', to = '{{autorityContactEmail}}', username = 'john', address = 'john@gmail.com', password = 'test', subject = 'High power consumption of {{deviceID}}', @map(type = 'text', @payload('Device ID: {{deviceID}} of room : {{roomID}} power is consuming {{finalPower}}kW/h. ')))
define stream AlertStream (deviceID string, roomID string, initialPower double, finalPower double, autorityContactEmail string);

@info(name = 'monitered-filter')@dist(execGroup='001')
from DevicePowerStream[type == 'monitored'] 
select deviceID, power, roomID 
insert current events into MonitoredDevicesPowerStream;

@info(name = 'power-increase-pattern')@dist(parallel='2', execGroup='002')
partition with (deviceID of MonitoredDevicesPowerStream)
begin
@info(name = 'avg-calculator')
from MonitoredDevicesPowerStream#window.time(2 min) 
select deviceID, avg(power) as avgPower, roomID 
insert current events into #AvgPowerStream;

@info(name = 'power-increase-detector')
from every e1 = #AvgPowerStream -> e2 = #AvgPowerStream[(e1.avgPower + 5) <= avgPower] within 10 min 
select e1.deviceID as deviceID, e1.avgPower as initialPower, e2.avgPower as finalPower, e1.roomID 
insert current events into RisingPowerStream;
end;


@info(name = 'power-range-filter')@dist(parallel='2', execGroup='003')
from RisingPowerStream[finalPower > 100] 
select deviceID, roomID, initialPower, finalPower, 'no-reply@powermanagement.com' as autorityContactEmail 
insert current events into AlertStream; 

@info(name = 'internal-filter')@dist(execGroup='004')
from DevicePowerStream[type == 'internal'] 
select deviceID, power 
insert current events into InternaltDevicesPowerStream;

When above siddhi app is deployed it will create a distributed processing chain as depicted in below figure.

Image Modified

A As annotated in the Siddhi app two passthough query group groups will be created to accept HTTP traffic and to send those events into messaging layer. Other execution groups will be created as per given parallelism count. Below table summarize execution group creation.

...