Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

A Siddhi Application is a combination of multiple Siddhi executional elements. A Siddhi executional element can be a Siddhi Query or a Siddhi Partition. When defining a Siddhi application, you can specify a number of parallel instances to be created for each executional element, and how each executional element must be isolated for an SP instance. Based on this, the initial Siddhi application is divided into multiple Siddhi applications and deployed in different SP instances.

...

Code Block
languagesql
@App:name('Energy-Alert-App')
@App:description('Energy consumption and anomaly detection')

@source(type = 'http',  topic = 'device-power', @map(type = 'json'), @dist(parallel='2'))
define stream DevicePowerStream (type string, deviceID string, power int, roomID string);

@sink(type = 'email', to = '{{autorityContactEmail}}', username = 'john', address = 'john@gmail.com', password = 'test', subject = 'High power consumption of {{deviceID}}', @map(type = 'text', @payload('Device ID: {{deviceID}} of room : {{roomID}} power is consuming {{finalPower}}kW/h. ')))
define stream AlertStream (deviceID string, roomID string, initialPower double, finalPower double, autorityContactEmail string);

@info(name = 'monitered-filter')@dist(execGroup='001')
from DevicePowerStream[type == 'monitored'] 
select deviceID, power, roomID 
insert current events into MonitoredDevicesPowerStream;

@info(name = 'power-increase-pattern')@dist(parallel='2', execGroup='002')
partition with (deviceID of MonitoredDevicesPowerStream)
begin
@info(name = 'avg-calculator')
from MonitoredDevicesPowerStream#window.time(2 min) 
select deviceID, avg(power) as avgPower, roomID 
insert current events into #AvgPowerStream;

@info(name = 'power-increase-detector')
from every e1 = #AvgPowerStream -> e2 = #AvgPowerStream[(e1.avgPower + 5) <= avgPower] within 10 min 
select e1.deviceID as deviceID, e1.avgPower as initialPower, e2.avgPower as finalPower, e1.roomID 
insert current events into RisingPowerStream;
end;


@info(name = 'power-range-filter')@dist(parallel='2', execGroup='003')
from RisingPowerStream[finalPower > 100] 
select deviceID, roomID, initialPower, finalPower, 'no-reply@powermanagement.com' as autorityContactEmail 
insert current events into AlertStream; 

@info(name = 'internal-filter')@dist(execGroup='004')
from DevicePowerStream[type == 'internal'] 
select deviceID, power 
insert current events into InternaltDevicesPowerStream;

When above siddhi app application is deployed it will create creates a distributed processing chain as depicted in the image below figure.

Image RemovedImage Added

As annotated in the Siddhi app application, two passthough query groups will be are created to accept HTTP traffic and to send those events into the messaging layer. Other execution groups will be are created as per the given parallelism count. Below table summarize The execution group creation is summarized in the table below.

Execution Group

Number of Siddhi

Application Instances

Queries executed

001

1
monitered-filter

002

2
power-increase-pattern

003

2
power-range-filter
0041
internal-filter