Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Incremental processing is a processing method which involves processing only a data partition newly added to a dataset when the existing data is already processed, instead of re-processing the complete dataset. This processing method improves efficiency by eliminating the system overhead of re-processing already processed data.

e.g., The data in an Analytics table needs to be summarized per day. The following table illustrates how the dataset is processed when the summarization script is run.


Without Incremental ProcessingWith Incremental Processing
1st RunThe complete dataset is processed to summarize the data.The complete dataset is processed to summarize the data.
2nd DayThe complete dataset is processed to summarize the data.Only the updates made to the dataset during the previous day are processed to summarize the data.

Once the summarization is complete, you need to commit the status to indicate that the data was successfully processed. This is done via the following command.
INCREMENTAL_TABLE_COMMIT orders; 

Publishing events

Incremental analytics uses the timestamps of the events sent when retrieving the data for processing.

...

NameuniqueID
Required/OptionalRequired
DescriptionThis is the unique ID of the incremental analytics definition. This ID should be used in the incremental table commit command as shown in the sample syntax.
Time Period 
Name timePeriod
Required/Optional Required
Description

The duration of the time period that you are processing.This can be MINUTE, HOUR, DAY, MONTH or YEAR. DAS has the ability to process the timestamp of each event and identify the unit of time during which it was sent.

e.g., If you specify HOUR as the time period, an event sent at 10.20PM is identified as an event sent during the 22.00 - 23.00 hour.

Example

The following is a list of events, and it is required to calculate the number of orders placed during each day.

Customer IDPhone TypeOrder IDCost_timestamp
1Nexus 5x33slsa2s400
26th May 2016 12:00:01
12Galaxy S7kskds22160027th May 2016 02:00:02
43iPhone 6ssadl3122700
27th May 2016 15:32:04
2MoTo Xsdda221s35027th May 2016 16:22:10
32LG G5lka2s24dkQ550
27th May 2016 19:42:42

The summarization script is run on 27th May 2016, at 12.00 noon. The last processed order ID kskds221.

Then the summarization table is as shown below.

DayEvent Count
26th May 20161
27th May 20161


As a result, the summarized table for 27th May 2016 should have 1event one event (because the other events that arrived on 27th May 2016 were received after 12.00 noon).

The next time the script is run, WSO2 DAS checks the timestamp of the event that was processed last. In this example, the last processed event is order ID kskds221 with timestamp 27th May 2016 02:00:02. Then DAS retrieves the data from the time period starting at 27th May 2016 00:00:00 to process all the data that has a timestamp greater than 27th May 2016 00:00:00.

This updates the value for the entry 27th May 2016 with the value 4 in the summarization table. The summarization table is not affected by any data received before 26th May 2016 because no data is retrieved from that time period. The resulting summarization table is shown below.

DayEvent Count
26th May 20161
27th May 20164
Excerpt
hiddentrue


Previous Time Periods
NamepreviousTimePeriods
Required/OptionalOptional
Description

If you add this parameter, WSO2 DAS is able to retrieve data for the summarized table from previous time periods. The number you specify for this parameter is the number of previous parameters to be considered.

ExampleIf the time period specified is DAY and the number specified for the previousTimePeriods parameter is 3, events sent on 24th, 25th and 26th of May 2016 are also considered when the summarization script in run on 27th May 2016.

The following sample further demonstrates how the incremental processing is carried out.

Expand
titleClick here to view the complete sample.

An event stream is defined with the following configuration. For more information about event streams, see Understanding Event Streams and Event Tables.

Code Block
languagejs
{
  "streamId": "APIStats:1.0.0",
  "name": "APIStats",
  "version": "1.0.0",
  "nickName": "",
  "description": "",
  "metaData": [],
  "correlationData": [],
  "payloadData": [
    {
      "name": "name",
      "type": "STRING"
    },
    {
      "name": "count",
      "type": "INT"
    },
    {
      "name": "_timestamp",
      "type": "LONG"
    }
  ]
}

The Spark script written to process the data received by this event stream is as follows.

Code Block
languagesql
create temporary table APIStats using CarbonAnalytics options (tableName "APIStats", schema "name STRING, count INT, _timestamp LONG", incrementalParams "api_stats_1, MINUTE");

create temporary table APIStatsMinuteSummary using CarbonAnalytics options (tableName "APIStatsMinSummary", schema "name STRING, count INT, _timestamp LONG, year INT, month INT, day INT, hour INT, min INT", primaryKeys "name, min, hour, day, month, year", incrementalParams "api_stats_min_1, HOUR");

create temporary table APIStatsHourSummary using CarbonAnalytics options (tableName "APIStatsHourSummary", schema "name STRING, count INT, _timestamp LONG, year INT, month INT, day INT, hour INT", primaryKeys "name, hour, day, year", incrementalParams "api_stats_hour_1, DAY");

create temporary table APIStatsDaySummary using CarbonAnalytics options (tableName "APIStatsDaySummary", schema "name STRING, count INT, _timestamp LONG, year INT, month INT, day INT", primaryKeys "name, day, year");

insert into table APIStatsMinuteSummary select name, sum(count) as count, getMinuteStartingTime(getYear(first(_timestamp)), getMonth(first(_timestamp)), getDay(first(_timestamp)), getHour(first(_timestamp)), getMinute(first(_timestamp))) as _timestamp, getYear(first(_timestamp)) as year, getMonth(first(_timestamp)) as month, getDay(first(_timestamp)) as day, getHour(first(_timestamp)) as hour, getMinute(first(_timestamp)) as min from APIStats group by name, getYear(_timestamp), getMonth(_timestamp), getDay(_timestamp), getHour(_timestamp), getMinute(_timestamp);

INCREMENTAL_TABLE_COMMIT api_stats_1;

insert into table APIStatsHourSummary select name, sum(count) as count, getHourStartingTime(year, month, day, hour) as _timestamp, year, month, day, hour from APIStatsMinuteSummary group by name, year, month, day, hour;

INCREMENTAL_TABLE_COMMIT api_stats_min_1;

insert into table APIStatsDaySummary select name, sum(count) as count, getDateStartingTime(year, month, day) as _timestamp, year, month, day from APIStatsHourSummary group by name, year, month, day;

INCREMENTAL_TABLE_COMMIT api_stats_hour_1;

The incrementalParams "api_stats_1, MINUTE" parameter specifies incremental processing to be applied. When the script is run, the system identifies the last minute during which the summarization was downdone, and processes all events with a timestamp that is greater than the timestamp of the start of that minute.

Info

To use the above sample, you need to add this UDF (download). The associated fully qualified class name entry for spark-udf-config.xml is org.wso2.das.sample.udf.DateTimeUDF.

For detailed instructions on how to add a UDF, see Creating Spark User Defined Functions.