Incremental Analysis
Incremental aggregation allows you to obtain aggregates in an incremental manner for a specified set of time periods.
This not only allows you to calculate aggregations with varied time granularity, but also allows you to access them in an interactive manner for reports, dashboards, and for further processing. Its schema is defined via the aggregation definition.
Configuring aggregation queries
This section explains how to calculate aggregates in an incremental manner for different sets of time periods, store the calculated values in a data store and then retrieve that information.
To demonstrate this, consider a scenario where a business that sells multiple brands stores its sales data in a physical
database for the purpose of retrieving them later to perform sales analysis. Each sales transaction is received with the following details:
symbol: The symbol that represents the brand of the items sold.price: the price at which each item was sold.amount: The number of items sold.
The Sales Analyst needs to retrieve the total number of items sold of each brand per month, per week, per day etc., and then retrieve these totals for specific time durations to prepare sales analysis reports.
To address the above use case, Siddhi queries need to be designed in two steps as follows:
Step 1: Calculate and persist time-based aggregate values
Before you begin:
Before creating queries to calculate and persist aggregate values, the following prerequisites need to be completed:
When you are defining a physical store to store aggregate values, you need to first download, install and set up the required database type. For more information about setting up the database, see Defining Tables for Physical Stores.
If the aggregation query included in the Siddhi application is only for read-only purposes, disable data purging.
To write the required Siddhi queries to calculate and persist aggregate values required for the scenario outlined above, follow the steps below:
To capture the input events based on which the aggregations are calculated, define an input stream as follows. The stream definition includes attributes named symbol, price and amount to capture the details described above.
The stream definition includes attributes named
symbol,priceandamountto capture the details described above. In addition, it has an attribute namedtimestampto capture the time at which the sales transaction occurs. The aggregations are executed based on this time.To persist the aggregates that are calculated via your Siddhi application, include a store definition as follows, if not the data is stored in-memory and lost when siddhi app is stopped.
Define an aggregation as follows. You can name it
TradeAggregation.To calculate aggregations, include a query as follows:
To get input
eventsfrom theTradeStreamstream that you previously defined, add afromclause as follows.To select attributes to be included in the output event, add a
selectclause as follows.Based on this, each output event has the following attributes.
The average price and the total number of items are the aggregates calculated in this scenario.
To group the output by the symbol, add a
group byclause as follows.The group by clause is optional in incremental aggregations. If it is not provided, all the events are aggregated together for each duration.
The timestamp included in each input event allows you to calculate aggregates for the range of time granularities seconds-years. Therefore, to calculate aggregates for each time granularity within this range, add the
aggregate byclause to this aggregate query as follows.This results in the average price and the total number of items sold being calculated per second, minute, hour, day, month and year. These time-based calculations are saved in the MySQL store you defined.
The complete Siddhi application with all the definitions and queries added looks as follows.
@App:name('TradeApp') define stream TradeStream (symbol string, price double, quantity long, timestamp long); @store(type='rdbms', jdbc.url="jdbc:mysql://localhost:3306/TestDB", username="root", password="root" , jdbc.driver.name="com.mysql.jdbc.Driver") define aggregation TradeAggregation from TradeStream select symbol, avg(price) as avgPrice, sum(quantity) as total group by symbol aggregate by timestamp every sec ... year;
The following is a list of optional annotations supported for incremental aggregation.
@store(type=<store type>, ...)
The aggregated results are stored in the database defined via this annotation in tables corresponding to each aggregate duration. For more information on how to define a store, see Defining Tables for Physical Stores.If a store is not defined via this annotation, the aggregated results are stored in in-memory tables by default. Adding a specific store definition is useful in a production environment. This is because the aggregations stored in in-memory tables can be lost if a system failure occurs.
The names of the tables are created in the<AGGREGATION_NAME>_<DURATION>format. In this scenario, the following tables are created:TradeAggregation_SECONDSTradeAggregation_MINUTESTradeAggregation_HOURSTradeAggregation_DAYSTradeAggregation_MONTHSTradeAggregation_YEARS
The primary keys of these tables are determined as follows:
AGG_TIMESTAMPandAGG_EVENT_TIMESTAMPare internally calculated values that reflect the time bucket of an aggregation for a particular duration. e.g. for thedayaggregations,AGG_TIMESTAMP = 1515110400000reflects that it is the aggregation for the 5th of January 2018 (1515110400000is the Unix time for2018-01-05 00:00:00). All aggregations are based on GMT timezone.
The other values stored in the table would be aggregations and other function calculations done in the aggregate definition. If any such function calculation is not an aggregation, the output value corresponds with the function calculation for the latest event of that time bucket. e.g., If a multiplication is carried out, the multiplication value corresponds with the latest event's multiplication as per the duration.
Certain aggregations are internally stored as a collection of other aggregations. e.g., the average aggregation is internally calculated as a function of sum and count. Hence, the table only reflects a sum and a count. The actual average is returned when the user retrieves the aggregate values as described in Step 2: Retrieve calculated and persisted aggregate values.The other values stored in the database table are aggregations and other function calculations carried out via the aggregation definition.
In the scenario described in this section, a timestamp is assigned to each input event via the
timestampattribute, andsymbolis thegroup bykey. Therefore, theTestDBstore defined via the@storeannotation uses theAGG_EVENT_TIMESTAMPandsymbolattributes as primary keys. Each record . in this store must have a unique combination of values for these two attributes.@purgeThis specifies whether automatic data purging is enabled or not. If automatic data purging is enabled, you need to specify the time interval at which the data purging should be carried out. In addition, you need to specify the time period for which the data should be retained based on the granularity by including the
@retentionPeriodannotation (described below). If this annotation is not included in an incremental aggregation query, the data purging is carried out every 15 minutes based on the default retention periods mentioned in the description of the@retentionPeriodannotation.Gliffy Diagram is only supported by the cloud editor
Because Forge macros arent supported by the legacy editor, you'll need to convert this content to the cloud editor to display this macro properly. Find out more about converting to the cloud editor@retentionPeriodThis specifies the time period for which data needs to be retained before it is purged, based on the granularity. The retention period defined for each granularity should be greater than, or equal to the minimum retention period as given below:
second: 120 secondsminute: 120 minuteshour: 25 hoursday: 32 daysmonth: 13 monthsyear: none
If the retention period is not specified, the default retention period for each granularity is applied as given below:
second: 120 secondsminute: 24 hourshour: 30 daysday: 1 yearmonth: Allyear: All
Step 2: Retrieve calculated and persisted aggregate values
This step involves retrieving the aggregate values that you calculated and persisted in Step 1. To do this, let's add the Siddhi definitions and queries required for retrieval to the TradeApp Siddhi application that you have already started creating for this scenario.
Retrieval logic for the same aggregation can be defined in different Siddhi app. However, only one aggregation should carry out the processing (i.e. the aggregation input stream should only feed events to one aggregation definition).
For this scenario, let's assume that the Sales Analyst needs to retrieve the sales totals for the time duration between 15th February 2014 and 16th March 2014.
To retrieve aggregations, you need to make retrieval requests. To capture these requests as events, let's define a stream as follows.
To process the events captured via the
TradeSummaryRetrievalStreamstream you defined, add a new query as follows.This query matches events in the
TradeSummaryRetrievalStreamstream and theTradeAggregationaggregation that was defined in step 1 based on the value for thesymbolattribute, and performs a join for each matching pair. Based on that join, it produces an output event(s) with thesymbol,totalandavgPriceattributes for the day granularity within the time range2014-02-15 00:00:00to2014-03-16 00:00:00. The time zone is represented by+05:30.
Note the following about the query given above:The
oncondition is optional for retrieval.You can provide the
withinduration in two ways as follows:within <start_time>, <end_time>: The<start_time>and<end_time>can beSTRINGorLONGvalues. If it is a string value, the format needs to be<yyyy>-<MM>-<dd> <HH>:<mm>:<ss> <Z>(<Z>represents the timezone. It can be omitted if the time is in GMT). You can provide long values if you need to specify the times as Unix timestamps (in milliseconds). In the above query, you are specifying the time duration via this method in theSTRINGformat.within <within_duration>: This method allows you to enter the time duration only as a STRING value. The format can be one of the following:<yyyy>-**-** **:**:** <Z><yyyy>-<MM>-** **:**:** <Z><yyyy>-<MM>-<dd> **:**:** <Z><yyyy>-<MM>-<dd> <HH>:**:** <Z><yyyy>-<MM>-<dd> <HH>:<mm>:** <Z><yyyy>-<MM>-<dd> <HH>:<mm>:<ss> <Z>
e.g., If the duration is specified as
2018-01-** **:**:**, it means within the first month of 2018. This is equal to the2018-01-01 00:00:00", "2018-02-01 00:00:00clause provided as per the previous method.
You do not need to specify the timezone via<Z>if the time is in GMT.
The
perclause specifies the time granularity for which the aggregations need to be retrieved. The value for this clause can beseconds,minutes,hours,days,months, oryears. The time granularity for which you want to retrieve values must have been included in the time range you specified when calculating and persisting aggregates. For more information, see Calculating and persisting time-based aggregate values - Step 4, substep d.The output contains the
totalandavgPricepersymbolfor all the days falling within the given time range.
The following are other ways in which you can construct the query to retrieve aggregate values:
Instead of providing the within and per values as constants in the query itself, you can retrieve them via attributes in the input stream definition as shown below.
Define the input stream as follows.
The above schema allows you to enter the start time and the end time of the time duration for which you want to retrieve aggregates in the aggregate retrieval request by including values for the
startTimeandendTimeattributes. The time granularity for which the aggregations need to be retrieved can be specified as the value for theperDurationattribute.Then add the query as follows.
Here, the
withinandperclauses refer to the attributes in the input stream that specify the time duration and the per duration.
If you want the retrieved aggregates to be sorted in the ascending order based on time, include the
AGG_TIMESTAMPin theselectclause as shown below. Once it is included in theselectclause, you can add theorder by AGG_TIMESTAMPclause.
Incremental aggregation in single-node deployments and HA deployments
In order to understand how incremental aggregation is carried out in different deployment, consider the following example that explains how the aggregation is executed internally.
Assume that six events arrive in the following sequence, with the given timestamps.
event 0 → 2018-01-01 05:59:58
event 1 → 2018-01-01 05:59:58
event 2 → 2018-01-01 05:59:59
event 3 → 2018-01-01 06:00:00
event 4 → 2018-01-01 06:00:01
event 5 → 2018-01-01 06:00:02
In this scenario, the aggregation is done for second, minute and hour durations. Therefore, based on the above timestamps, the second, minute, and hour during which each event occured is as follows.
As mentioned before, when storing, time based aggregatesa table is created for each time granularity in the <AGGREGATE_NAME>_<TIME_GRANUARITY> format. Assuming that the name of the aggregation in TradeAggregation, three tables are created as follows.
TradeAggregation_SECONDSTradeAggregation_MINUTESTradeAggregation_HOURS
The incremental analysis related execution that is carried out by the system with the arrival of each event is described in the table below.
Arriving Event | Execution |
|---|---|
0 | The system initially processes event 0 at the |
1 | This event also occurs during the 58th second (same as event 0). Therefore, the system aggregates events 0 and 1 together. |
2 | Event 2 arrives during the 59th second. The 58th second has elapsed at the time of this event arrival. Therefore, the system does the following:
|
3 | Event 3 arrives during the second 0 of 06.00 minute. With this arrival, the aggregations for the 59th second and 59th minute expire. Therefore, the system does the following:
|
4 | At the time event 4 arrives during the 1st second of the 06.00 minute, second 0 of the same minute has elapsed. Therefore, the system does the following:
|
5 | Event 5 arrives during the 2nd second. At this time, the 1st second has elapsed. Therefore, the system does the following:
|
The following is the aggregation status after all six events have arrrived.
In-memory Table | Available Aggregation Records | Processing In-Memory |
|---|---|---|
|
| Aggregation for |
| Aggregation for | Aggregation for 2018-01-01 06:00:00 (i.e., minute 0 of the 6th hour. Here, event 3 and 4 are aggregated together.) |
| None | Aggregation for 2018-01-01 05:00:00 (i.e., the 5th hour. Here, events 0,1, and 2 are aggregated together.) |
Now let's consider how the scenario described above works in a single node deployment and an HA deployment.
Single node deployment
If the@storeconfigurtation is not provided, the system stores older aggregations (i.e., aggregations that are already completed for a particular second, minute etc., and therefore not running in-memory) in in-memory tables. In such a situation, a server failure results in all these aggregations done up to the time of the failure being lost.
If a database configuration is provided via the@storeannotation, the older aggregations are stored in the external database. Therefore, if the node fails, the system recreates the in-memory running aggregations from this stored data once you restart the server. In the given scenario, in-memory aggregations for theMINUTEexecution level can be recreated with data in theTradeAggregation_SECONDStable (i.e., points 3 and 4 in the above aggregation status summary table). Similarly, in-memory aggregations for theHOURexecution level can be recreated from theTradeAggregation_MINUTEStable.
However, the recreation described above cannot be done for the most granular duration for which aggregation done. e.g., In the given scenario, the system cannot recreate the in-memory aggregations for theSECONDexecution level because there is no database table for a prior duration.
Assume that in the given scenario, a server failure occurs after all five events have arrived. Once you restart the server, only event 5 is lost because that was the only aggregation being executed for theSECONDexecution level in the in-memory tables at that time.HA deployment
In a minimum HA setup, one runs as an active node and the other is passive. No events are lost even if the
@storeconfiguration is not provided due to snapshoting the current state of SP. When a snapshot iof the SP state is created, the system does not recreate aggregations from tables because it is not required. The newly active node (which was previously passive) continues to process the new events that arrive after the failure of the other node as in a situation where no server failura has occured.
The following is a summary of the retrievability of aggregates based on how they are stored and how WSO2 SP is deployed.
Gliffy Diagram is only supported by the cloud editor
Because Forge macros arent supported by the legacy editor, you'll need to convert this content to the cloud editor to display this macro properly. Find out more about converting to the cloud editor
Scaling through distributed aggregations
Distributed aggregations partially process aggregations in different nodes. This allows you to assign one node to process only a part of an aggregation (regional scaling, etc.). In order to do this all the aggregations must have a physical database and must be linked to the same database.
Syntax
The @PartitionById annotation must be added before the aggregation definition as shown below.
@store(type="<store type>", ...)
@PartitionById(enable='true')
define aggregation <aggregator name>
from <input stream>
select <attribute name>, <aggregate function>(<attribute name>) as <attribute name>, ...
group by <attribute name>
aggregate by <timestamp attribute> every <time periods> ;
You can enable all Siddhi applications to partition aggregations in this manner by adding the following system parameters in the <SP_HOME>/conf/<PROFILE/deployment.yaml file under siddhi.
siddhi:
properties
partitionById: true
shardId: wso2-spA unique ID must be provided for each node via a system parameter named shardId. This parameter is required when partitioning aggregations is enabled for a single Siddhi application as well as when it it enabled for all Siddhi applications.
To maintain data consistency, do not change the shard IDs after the first configuration.
Example
The following query can be included in two Siddhi applications in two different nodes that are connected to the same database. Separate input events are generated for both nodes. Each node performs the aggregations and stores the results in the database. When the aggregations are retrieved, the collective result of both nodes are considered.
define stream TradeStream (symbol string, price double, quantity long, timestamp long);
@store(type='rdbms', jdbc.url="jdbc:mysql://localhost:3306/TestDB", username="root", password="root" , jdbc.driver.name="com.mysql.jdbc.Driver")
@PartitionById(enable='true')
define aggregation TradeAggregation
from TradeStream
select symbol, avg(price) as avgPrice, sum(quantity) as total
group by symbol
aggregate by timestamp every sec ... year;Let's assume that the following input events were generated for the two nodes during a specific hour.
Node 1
Event | symbol | price | quantity |
|---|---|---|---|
1 | wso2 | 100 | 10 |
2 | wso2 | 100 | 20 |
Node 2
Event | symbol | price | quantity |
|---|---|---|---|
1 | wso2 | 100 | 10 |
2 | wso2 | 100 |