...
Usecase - 4 (Incremental processing with OLD already existing data)
Code Block |
---|
CREATE EXTERNAL TABLE IF NOT EXISTS PhoneSalesTable (orderID STRING, brandName STRING, userName STRING, quantity INT, version STRING) STORED BY 'org.apache.hadoop.hive.cassandra.CassandraStorageHandler' WITH SERDEPROPERTIES ( "wso2.carbon.datasource.name" = "WSO2BAM_CASSANDRA_DATASOURCE", "cassandra.cf.name" = "org_wso2_bam_phone_retail_store_kpi" , "cassandra.columns.mapping" = ":key,payload_brand, payload_user, payload_quantity, Version" ); @Incremental(name="salesAnalysis", tables="PhoneSalesTable", hasNonIndexedData = “true” bufferTime="0") select brandName, count(DISTINCT orderID), sum(quantity) from PhoneSalesTable where version= "1.0.0" group by brandName; |
This is similar use case as usecase -1 but here the incremental processing enabled for the stream not from the origin of the event stream and some where in the middle of the event stream.
This feature was added since there are users who already have some data being published, and they may want to switch for incremental processing in the middle. Therefore for the first iteration of incremental processing if the property hasNonIndexedData set, it will consider the non-indexed data also. And then from the second iteration it’ll only consider the indexed data.