Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

BAM Hive script considers the entire data in a column family during the summarization process, and it summarizes repeatedly already summarized data. But most of the usecases use cases we don't need the entire data to be analysed analyzed again in roder order to produce the final result. Therefore from BAM 2.4.0 we have implemented Incrementally analysing analyzing the data received into the BAM, which will use your resource more efficiently.

...

To enable incremental analysis for your stream,

  • Instal Install a toolbox
  • Set the enableIncrementalIndex property in the streams.properties file that is bundled in the .tbox file of your toolbox

...

Code Block
streams.definitions=defn1
streams.definitions.defn1.filename=mystream
streams.definitions.defn1.username=admin
streams.definitions.defn1.password=admin
streams.definitions.defn1.description=This is the datastream published from my BAM/CEP publisher.
streams.definitions.defn1.enableIncrementalIndex=true

...

In this section we'll discuss about the possible use cases of incremental feature.You can add the @incremental annotation before your hive script and make your hive script to be processed incrementally. Also you need to add a name for the incremental annotation which will make your following query to be processed from the unique pointer; . Basically this is basically will avoid clashing with other hive queries. We'll see below in detail about possible syntax and each attributes.

...

Code Block
CREATE EXTERNAL TABLE IF NOT EXISTS PhoneSalesTable
   (orderID STRING, brandName STRING, userName STRING, quantity INT,
    version STRING) STORED BY
   'org.apache.hadoop.hive.cassandra.CassandraStorageHandler'
   WITH SERDEPROPERTIES (
     "wso2.carbon.datasource.name" = "WSO2BAM_CASSANDRA_DATASOURCE",
   "cassandra.cf.name" = "org_wso2_bam_phone_retail_store_kpi" ,
   "cassandra.columns.mapping" =
   ":key,payload_brand, payload_user, payload_quantity, Version" );
 

@Incremental(name="salesAnalysis", tables="PhoneSalesTable", hasNonIndexedData = “true”, bufferTime="0")
select brandName, count(DISTINCT orderID),
   sum(quantity) from PhoneSalesTable
   where version= "1.0.0" group by brandName;

...