Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

This section introduces you to Apache Hive query language (HQL) and how to set up databases and write Hive scripts to process and analyze the data stored in RDBMS or noSQL data bases. All Hive-related configurations in BAM are included in the following files.

  • For Linux: <BAM_HOME>/repository/conf/advanced/hive-site.xml
  • For Windows: <BAM_HOME>/repository/conf/advanced/hive-site-win.xml

The following topics are covered:

Table of Contents
maxLevel3
minLevel3

...

Example 1:This is a basic example query to create a virtual Hive table by the name ActivityDataTable corresponding to a physical Cassandra column family.

 

Code Block
languagesql
CREATE EXTERNAL TABLE IF NOT EXISTS ActivityDataTable
(messageID STRING, sentTimestamp BIGINT, activityID STRING, version STRING, soapHeader STRING, soapBody STRING, host STRING)
STORED BY 'org.apache.hadoop.hive.cassandra.CassandraStorageHandler'
WITH SERDEPROPERTIES (
"wso2.carbon.datasource.name" = "WSO2BAM_CASSANDRA_DATASOURCE",
"cassandra.partitioner" = "org.apache.cassandra.dht.RandomPartitioner",
"cassandra.cf.name" = "org_wso2_bam_activity_monitoring" ,
"cassandra.columns.mapping" =
":key, payload_timestamp, correlation_bam_activity_id, Version, payload_SOAPHeader, payload_SOAPBody, meta_host" );

Note the following regarding the above query:

  • The new handler org.apache.hadoop.hive.cassandra.CassandraStorageHandler is used instead of the JDBC handler class.
  • WITH SERDEPROPERTIES is used instead of TBLPROPERTIES command.
  • The Cassandra storage handler class takes the following parameters for its SerDe properties.
    • cassandra.host : Host names/IP Addresses of Cassandra nodes. You can use a comma-separated list for multiple nodes in the Cassandra ring for fail-over.
    • cassandra.port : The port through which Cassandra listens to client requests.
    • cassandra.ks.name : Cassandra Keyspace name. Keyspaces are logically similar to databases in RDBMS. The connection parameters, host, port, username and password are declared explicitly in cassandra.host, cassandra.port, cassandra.ks.username and cassandra.ks.password respectively. The name of the keyspace is EVENT_KS by default. To change this, edit the <keySpaceaName> element in <BAM_HOME>/repository/conf/ data-bridges/data-bridge- config.xml file.  
    • cassandra.ks.username : Username (username@tenant_domain if in Stratos) for authenticating Cassandra Keyspace. If no authentication is required to the Keyspace to be connected, you can skip this.
    • cassandra.ks.password : Password for authenticating the Cassandra Keyspace. If no authentication is required to the Keyspace to be connected, you can skip this.
    • cassandra.cf.name : Cassandra ColumnFamily name. In this example, org_wso2_bam_activity_monitoring is set as the column family name.
    • cassandra.columns.mapping : Used to map the Cassandra column family keys to the Hive table fields. Should be in the same order as the Hive field definitions in CREATE TABLE. So the Hive table fields messageID, sentTimestamp, activityID, version, soapHeader, soapBody and host are mapped to the column family keys (keys of key-value pairs) by the names :key, payload_timestamp, correlation_bam_activity_id, Version, payload_SOAPHeader, payload_SOAPBody and meta_host. The reason is because the column family is already created, and the Hive script only creates the mapped Hive table onto the existing column family. :key is the unique row key available for each row in the Cassandra column family. You should map this field with a Hive table field in every Hive script.

...

Info

If you are using a Cassandra version below 1.1.3, you need to add the following element in Hive queries you write: "cassandra.partitioner" = "org.apache.cassandra.dht.RandomPartitioner". An example Hive query with this element will be as follows.

Code Block
languagesql
CREATE EXTERNAL TABLE IF NOT EXISTS PhoneSalesTable 
    (orderID STRING, brandName STRING, userName STRING, quantity INT,
     version STRING) STORED BY 
    'org.apache.hadoop.hive.cassandra.CassandraStorageHandler' 
    WITH SERDEPROPERTIES (
     "wso2.carbon.datasource.name" = "WSO2BAM_CASSANDRA_DATASOURCE",
    "cassandra.partitioner" = "org.apache.cassandra.dht.RandomPartitioner",
    "cassandra.cf.name" = "org_wso2_bam_phone_retail_store_kpi" , 
    "cassandra.columns.mapping" = 
    ":key,payload_brand, payload_user, payload_quantity, Version" );


...

Datasource names can also be used for Cassandra storage hander in Hive scripts. Predefined datasource named "WSO2BAM_CASSANDRA_DATASOURCE" is already there for this purpose. Modifications for "WSO2BAM_CASSANDRA_DATASOURCE" can be done by editing <CARBON<BAM_SERVER>HOME>/repository/conf/datasources/bam-datasources.xml file. Given below is the datasource configuration for Cassandra.

...

After configuring a datasource for cassandra, you can just define a peroperty named "wso2.carbon.datasource.name" in hive scripts so that the following properties are no longer required.
    1) cassandra.host
    2) cassandra.port
    3) cassandra.ks.name
    4) cassandra.ks.username
    5) cassandra.ks.password

Here is an example hive script which use datasource name as cassandra Cassandra storage handler

Code Block
languagesql
CREATE EXTERNAL TABLE IF NOT EXISTS ActivityDataTable
    (messageID STRING, sentTimestamp BIGINT, activityID STRING, version STRING, soapHeader STRING, soapBody STRING, host STRING)
    STORED BY 'org.apache.hadoop.hive.cassandra.CassandraStorageHandler'
    WITH SERDEPROPERTIES (
    "wso2.carbon.datasource.name" = "WSO2BAM_CASSANDRA_DATASOURCE",
    "cassandra.cf.name" = "org_wso2_bam_activity_monitoring" ,
    "cassandra.columns.mapping" =
    ":key, payload_timestamp, correlation_bam_activity_id, Version, payload_SOAPHeader, payload_SOAPBody, meta_host" );

...

BAM uses JDBC storage handler to connect to relational databases from Hive. You can specify JDBC storage handlers using org.wso2.carbon.hadoop.hive.jdbc.storage.JDBCStorageHandler class in STORED BY clause.

...

    1. 'wso2.carbon.datasource.name' = 'WSO2BAM_DATASOURCE' : Data source name provides connection-related information such as username, password and port. WSO2BAM_DATASOURCE is an H2 data source that comes by default with BAM. You can define it in BAM<BAM_HOMEHOME>/repository/conf/datasources/bam-datasources.xml file. Alternatively, you can also define the database connection parameters explicitly. We discuss the latter approach in Method 2 below.
    2. 'hive.jdbc.update.on.duplicate' = 'true' : Used to overwrite existing data rows in the table when it is updated with an entry that has the same primary key.
    3. 'hive.jdbc.primary.key.fields' = 'messageRowID' : Used to set primary key fields to the Hive table. In this example, only messageRowID is set as the primary key. But, you can set more than one primary key separated by comma. For example, 'hive.jdbc.primary.key.fields' = 'primarykey1, primarykey2, primarykey3'.
    4. 'hive.jdbc.table.create.query' = 'CREATE TABLE... : This is the only place where you should use real SQL commands that are run in the real RDBMS system. The SQL command to create the H2 database is : CREATE TABLE ActivitySummary (messageRowID VARCHAR(100) NOT NULL PRIMARY KEY, sentTimestamp BIGINT, bamActivityID VARCHAR(40), soapHeader TEXT, soapBody TEXT, host VARCHAR(25).

...

  • 'mapred.jdbc.driver.class' = 'org.h2.Driver' : This is the JDBC driver class name required for the database connection. You should add the JDBC driver JAR file to <BAM_HOME>/repository/components/lib/ directory. The H2 JDBC driver comes by default with WSO2 BAM. DBC driver class name.
  • 'mapred.jdbc.url' = 'jdbc:h2:repository/database/samples/WSO2CARBON_DB;AUTO_SERVER=TRUE' : The JDBC connection URL to the database.
  • 'mapred.jdbc.username' = 'wso2carbon' : JDBC Username.
  • 'mapred.jdbc.password' = 'wso2carbon' : JDBC Password.
  • hive.jdbc.update.on.duplicate : Updates the record in the database on existing records with current records primary key if true.
  • hive.jdbc.primary.key.fields : Specifies the primary keys of database table.
  • hive.jdbc.table.create.query : You can provide here a create table query with database-specific syntax. Useful when the database has specific syntax for creating tables.

...