...
Code Block | ||
---|---|---|
| ||
@App:name('Energy-Alert-App') @App:description('Energy consumption and anomaly detection') @source(type = 'http', topic = 'device-power', @map(type = 'json'), @dist(parallel='2')) define stream DevicePowerStream (type string, deviceID string, power int, roomID string); @sink(type = 'email', to = '{{autorityContactEmail}}', username = 'john', address = 'john@gmail.com', password = 'test', subject = 'High power consumption of {{deviceID}}', @map(type = 'text', @payload('Device ID: {{deviceID}} of room : {{roomID}} power is consuming {{finalPower}}kW/h. '))) define stream AlertStream (deviceID string, roomID string, initialPower double, finalPower double, autorityContactEmail string); @info(name = 'monitered-filter')@dist(execGroup='001') from DevicePowerStream[type == 'monitored'] select deviceID, power, roomID insert current events into MonitoredDevicesPowerStream; @info(name = 'power-increase-pattern')@dist(parallel='2', execGroup='002') partition with (deviceID of MonitoredDevicesPowerStream) begin @info(name = 'avg-calculator') from MonitoredDevicesPowerStream#window.time(2 min) select deviceID, avg(power) as avgPower, roomID insert current events into #AvgPowerStream; @info(name = 'power-increase-detector') from every e1 = #AvgPowerStream -> e2 = #AvgPowerStream[(e1.avgPower + 5) <= avgPower] within 10 min select e1.deviceID as deviceID, e1.avgPower as initialPower, e2.avgPower as finalPower, e1.roomID insert current events into RisingPowerStream; end; @info(name = 'power-range-filter')@dist(parallel='2', execGroup='003') from RisingPowerStream[finalPower > 100] select deviceID, roomID, initialPower, finalPower, 'no-reply@powermanagement.com' as autorityContactEmail insert current events into AlertStream; @info(name = 'internal-filter')@dist(execGroup='004') from DevicePowerStream[type == 'internal'] select deviceID, power insert current events into InternaltDevicesPowerStream; |
When above siddhi app application is deployed it will create creates a distributed processing chain as depicted in the image below figure.
As annotated in the Siddhi app application, two passthough query groups will be are created to accept HTTP traffic and to send those events into the messaging layer. Other execution groups will be are created as per the given parallelism count. Below table summarize The execution group creation is summarized in the table below.
Execution Group | Number of Siddhi Application Instances | Queries executed |
---|---|---|
| 1 | monitered-filter |
| 2 | power-increase-pattern |
| 2 | power-range-filter |
004 | 1 | internal-filter |