Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

How to enable incremental analysis for a stream

 

To enable incremental analysis for your stream, 

  • Instal a toolbox
  • Set the enableIncrementalIndex property in the streams.properties file that is bundled in the .tbox file of your toolbox

...

It populates the incremental index for that particular stream definition. 

The following sample shows how to enable the property. After enabling, all events received for this stream definition will be indexed for incremental processing.

...

Code Block
CREATE EXTERNAL TABLE IF NOT EXISTS WideRowReadTable
   (orderID STRING, colName STRING, colValue STRING) STORED BY
   'org.apache.hadoop.hive.cassandra.CassandraStorageHandler'
   WITH SERDEPROPERTIES (
   "wso2.carbon.datasource.name" = "WSO2BAM_CASSANDRA_DATASOURCE",
   "cassandra.cf.name" = "org_wso2_bam_phone_retail_store_kpi" ,
   "cassandra.columns.mapping" =
   ":key,:column,:value" );  
  

CREATE EXTERNAL TABLE IF NOT EXISTS WideRowWriteTableTest1
   (orderID STRING, colName STRING, colValue STRING) STORED BY
   'org.apache.hadoop.hive.cassandra.CassandraStorageHandler'
   WITH SERDEPROPERTIES (
   "wso2.carbon.datasource.name" = "WSO2BAM_CASSANDRA_DATASOURCE",
   "cassandra.cf.name" = "org_wso2_bam_phone_retail_store_kpi_wide_test_1" ,
   "cassandra.columns.mapping" =
   ":key,:column, :value" );   
                             

@Incremental(name="wideRow", tables="WideRowReadTable", bufferTime="0")
insert overwrite table WideRowWriteTableTest1
   select orderId, colName, colValue from WideRowReadTable JOIN
   (select orderId as rowId from WideRowReadTable where colName="Version" AND colValue="1.0.0")rowIdTable ON WideRowReadTable.orderId=rowIdTable.rowID;

...

  • The above example is basically copying the data from one column family to another column family.
  • :key,:column, :value are basically transpose of the actual row, which basically make row key + colum+key as a row in the hive table, and make the casandra table flat.

...

Usecase 4 - incremental processing with already existing data

Code Block
CREATE EXTERNAL TABLE IF NOT EXISTS PhoneSalesTable
   (orderID STRING, brandName STRING, userName STRING, quantity INT,
    version STRING) STORED BY
   'org.apache.hadoop.hive.cassandra.CassandraStorageHandler'
   WITH SERDEPROPERTIES (
     "wso2.carbon.datasource.name" = "WSO2BAM_CASSANDRA_DATASOURCE",
   "cassandra.cf.name" = "org_wso2_bam_phone_retail_store_kpi" ,
   "cassandra.columns.mapping" =
   ":key,payload_brand, payload_user, payload_quantity, Version" );
 

@Incremental(name="salesAnalysis", tables="PhoneSalesTable", hasNonIndexedData = “true” bufferTime="0")
select brandName, count(DISTINCT orderID),
   sum(quantity) from PhoneSalesTable
   where version= "1.0.0" group by brandName;

...

  • This is similar use case as usecase -1 but here the incremental processing enabled for the stream not from the origin of the event stream and some where in the middle of the event stream.

  • This feature was added since there are users who already have some data being published, and they may want to switch for incremental processing in the middle. Therefore for the first iteration of incremental processing if the property hasNonIndexedData set, it will consider the non-indexed data also.  And then from the second iteration it’ll only consider the indexed data.

...