Incremental processing is a processing method which involves processing only a data partition newly added to a dataset when the existing data is already processed, instead of re-processing the complete dataset. This processing method improves efficiency by eliminating the system overhead of re-processing already processed data.
e.g., The data in an Analytics table needs to be summarized per day. The following table illustrates how the dataset is processed when the summarization script is run.
Without Incremental Processing | With Incremental Processing | |
---|---|---|
1st Run | The complete dataset is processed to summarize the data. | The complete dataset is processed to summarize the data. |
2nd Day | The complete dataset is processed to summarize the data. | Only the updates made to the dataset during the previous day are processed to summarize the data. |
Once the summarization is complete, you need to commit the status to indicate that the data was successfully processed. This is done via the following command.INCREMENTAL_TABLE_COMMIT orders;
Publishing events
Incremental analytics uses the timestamps of the events sent when retrieving the data for processing.
When defining event streams for incremental analytics, you can add an extra attribute to the event payload named _timestamp
of the LONG
attribute type. This allows a specific timestamp to be specified for each event. For more information, see Understanding Event Streams and Event Tables - Adding an event stream.
If the _timestamp
attribute is not specified in the event stream definition, the timestamp of each event is derived from the system date at the time the event was persisted in the database.
Syntax
create temporary table orders using CarbonAnalytics options (tableName "ORDERS", schema "customerID STRING, phoneType STIRNG, OrderID STRING, cost DOUBLE, _timestamp LONG -i", incrementalParams "orders, DAY");
In order to apply incremental processing to an Analytics table, the incrementalParams
attribute should be added to the table definition as shown in the extract above. If this parameter is not added, the table is considered a typical analytics table, and the complete table is processed for each query.
The incrementalParams
attribute should be added to the definition of the table from which the data is read.
Parameters
The following parameters are configured to support incremental processing as shown in the above sample syntax.
Unique ID
Name | uniqueID |
---|---|
Required/Optional | Required |
Description | This is the unique ID of the incremental analytics definition. This ID should be used in the incremental table commit command as shown in the sample syntax. |
Time Period
Name | timePeriod | ||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Required/Optional | Required | ||||||||||||||||||||||||||||||||||||||||||
Description | The duration of the time period that you are processing.This can be e.g., If you specify | ||||||||||||||||||||||||||||||||||||||||||
Example | The following is a list of events, and it is required to calculate the number of orders placed during each day.
The summarization script is run on 27th May 2016, at 12.00 noon. The last processed order ID Then the summarization table is as shown below.
As a result, the summarized table for 27th May 2016 should have 1event (because the other events that arrived on 27th May 2016 were received after 12.00 noon). The next time the script is run, WSO2 DAS checks the timestamp of the event that was processed last. In this example, the last processed event is order ID This updates the value for the entry 27th May 2016 with the value
|
The following sample further demonstrates how the incremental processing is carried out.