Unknown macro: {next_previous_link3}
Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 17 Next »

BAM Hive script considers the entire data in a column family during the summarization process, and it summarizes repeatedly already summarized data. But most of the usecases we don't need the entire data to be analysed again in roder to produce the final result. Therefore from BAM 2.4.0 we have implemented Incrementally analysing the data received into the BAM, which will use your resource more efficiently.

In this section we'll discuss about possible use cases and how you can this feature more efficiently.

Enabling incremental analysis for your stream

You can enable the incremental analysis for your stream by installing a toolbox. It’s required to set the ‘enableIncrementalIndex’ property in the streams.properties file, in order to populate the incremental index for that particular stream definition.

Below is the sample which shows how to enable this. After this all events received for this stream defintion will be indexed for incremental processing.

streams.definitions=defn1
streams.definitions.defn1.filename=mystream
streams.definitions.defn1.username=admin
streams.definitions.defn1.password=admin
streams.definitions.defn1.description=This is the datastream published from my BAM/CEP publisher.
streams.definitions.defn1.enableIncrementalIndex=true

Where to use incremental analysis

In this section we'll discuss about the possible use cases of incremental feature.You can add the @incremental annotation before your hive script and make your hive script to be processed incrementally. Also you need to add a name for the incremental annotation which will make your following query to be processed from the unique pointer; this is basically will avoid clashing with other hive queries. We'll see below in detail about possible syntax and each attributes.

 

Usecase - 1 (Incremental processing for the stream which has the incremental enabled from beginning of the stream )

 

CREATE EXTERNAL TABLE IF NOT EXISTS PhoneSalesTable(orderID STRING, brandName STRING, userName STRING, quantity INT,
   version STRING) STORED BY
   'org.apache.hadoop.hive.cassandra.CassandraStorageHandler'
   WITH SERDEPROPERTIES (
   "wso2.carbon.datasource.name" = "WSO2BAM_CASSANDRA_DATASOURCE",
   "cassandra.cf.name" = "org_wso2_bam_phone_retail_store_kpi" ,
   "cassandra.columns.mapping" =
   ":key,payload_brand, payload_user, payload_quantity, Version" );
 

@Incremental(name="salesAnalysis", tables="PhoneSalesTable", bufferTime="20")
select brandName, count(DISTINCT orderID),
   sum(quantity) from PhoneSalesTable
   where version= "1.0.0" group by brandName;

  

  • @Incremental is the annotation used for incremental processing. The query followed by this annotation will be processed incremental manner.

  • 'name' attribute in the incremental annotation is unique per hive script. Ie, you can have the same name in different hive scripts and those will be processed independently.

  • 'tables' attribute indicates what tables needed to be processed incrementally. You can use more that one tables within the hive query with multiple JOIN, etc, but the table name which is indicated in the tables attribute only will be considered to process incrementally. And also only you can  process cassandra hive tables incrementally.

  • 'bufferTime' involves is the time of rows that should be avoided to consideration of current hive processing cycle. This is due to the fact,  there might be some partially arrived data  and you need to have some buffer time for those data to be fully arrived. This is in millisecond time.

 

Usecase -2 (Enable incremental processing for the Cassandra column family you are trying to insert from hive script)

 

CREATE EXTERNAL TABLE IF NOT EXISTS WriteTestTable
   (orderID STRING, brandName STRING, userName STRING, quantity INT,
    version STRING) STORED BY
   'org.apache.hadoop.hive.cassandra.CassandraStorageHandler'
   WITH SERDEPROPERTIES (
   "wso2.carbon.datasource.name" = "WSO2BAM_CASSANDRA_DATASOURCE",
   "cassandra.cf.name" = "casandra_write_test" ,
   "cassandra.enable.incremental.process"="true",
   "cassandra.columns.mapping" =
   ":key,payload_brand, payload_user, payload_quantity, Version" );    
   
insert overwrite table WriteTestTable
   select * from PhoneSalesTable;

 

  • You need set "cassandra.enable.incremental.process"="true" property if you want to enable the incremental processing via hive query.
  • After this you can continue to use the cassandra column family created by the hive query also can be processed incrementally as same as mentioned in the usecase -1.

Usecase -3 (Enabling incremental processing with wide row Hive / cassandra table)

 

CREATE EXTERNAL TABLE IF NOT EXISTS WideRowReadTable
   (orderID STRING, colName STRING, colValue STRING) STORED BY
   'org.apache.hadoop.hive.cassandra.CassandraStorageHandler'
   WITH SERDEPROPERTIES (
   "wso2.carbon.datasource.name" = "WSO2BAM_CASSANDRA_DATASOURCE",
   "cassandra.cf.name" = "org_wso2_bam_phone_retail_store_kpi" ,
   "cassandra.columns.mapping" =
   ":key,:column,:value" );  
  

CREATE EXTERNAL TABLE IF NOT EXISTS WideRowWriteTableTest1
   (orderID STRING, colName STRING, colValue STRING) STORED BY
   'org.apache.hadoop.hive.cassandra.CassandraStorageHandler'
   WITH SERDEPROPERTIES (
   "wso2.carbon.datasource.name" = "WSO2BAM_CASSANDRA_DATASOURCE",
   "cassandra.cf.name" = "org_wso2_bam_phone_retail_store_kpi_wide_test_1" ,
   "cassandra.columns.mapping" =
   ":key,:column, :value" );   
                             

@Incremental(name="wideRow", tables="WideRowReadTable", bufferTime="0")
insert overwrite table WideRowWriteTableTest1
   select orderId, colName, colValue from WideRowReadTable JOIN
   (select orderId as rowId from WideRowReadTable where colName="Version" AND colValue="1.0.0")rowIdTable ON WideRowReadTable.orderId=rowIdTable.rowID;

 

  • The above example is basically copying the data from one column family to another column family.
  • :key,:column, :value are basically transpose of the actual row, which basically make row key + colum+key as a row in the hive table, and make the casandra table flat.

 

Usecase - 4 (Incremental processing with OLD already existing data)

CREATE EXTERNAL TABLE IF NOT EXISTS PhoneSalesTable
   (orderID STRING, brandName STRING, userName STRING, quantity INT,
    version STRING) STORED BY
   'org.apache.hadoop.hive.cassandra.CassandraStorageHandler'
   WITH SERDEPROPERTIES (
     "wso2.carbon.datasource.name" = "WSO2BAM_CASSANDRA_DATASOURCE",
   "cassandra.cf.name" = "org_wso2_bam_phone_retail_store_kpi" ,
   "cassandra.columns.mapping" =
   ":key,payload_brand, payload_user, payload_quantity, Version" );
 

@Incremental(name="salesAnalysis", tables="PhoneSalesTable", hasNonIndexedData = “true” bufferTime="0")
select brandName, count(DISTINCT orderID),
   sum(quantity) from PhoneSalesTable
   where version= "1.0.0" group by brandName;

 

  • This is similar use case as usecase -1 but here the incremental processing enabled for the stream not from the origin of the event stream and some where in the middle of the event stream.

  • This feature was added since there are users who already have some data being published, and they may want to switch for incremental processing in the middle. Therefore for the first iteration of incremental processing if the property hasNonIndexedData set, it will consider the non-indexed data also.  And then from the second iteration it’ll only consider the indexed data.

 

  • No labels