com.atlassian.confluence.content.render.xhtml.migration.exceptions.UnknownMacroMigrationException: The macro 'next_previous_link3' is unknown.

Creating Hive Queries to Analyze Data

This section introduces you to Apache Hive query language (HQL) and how to set up databases and write Hive scripts to process and analyze the data stored in RDBMS or noSQL data bases. All Hive-related configurations in BAM are included in the following files.

  • For Linux: <BAM_HOME>/repository/conf/advanced/hive-site.xml
  • For Windows: <BAM_HOME>/repository/conf/advanced/hive-site-win.xml

The following topics are covered:

Hive queries in BAM

Apache Hive is a data warehouse system for Hadoop. It facilitates easy data summarization, ad-hoc queries, and the analysis of large data sets stored in Hadoop-compatible file systems. Hive provides a mechanism to project structure onto data, and query the data using an SQL-like language called HiveQL.

This is not a complete Hive learning resource. We describe here only selected Hive syntax used in WSO2 BAM. If you are a beginner, we recommend you to also see other materials on Hive, including the online Apache Hive project documentation. Given below are good starting points:

Business information processed by BAM is ideally stored in some RDBMS or noSQL data store. Few examples are as follows:

  • RDBMS tables : For example, H2 database table, MySQL table.
  • noSQL column families : For example, Cassandra column families.

We use Hive scripts to analyze collected data and derive meaningful information from them before storing in these RDBMS or noSQL data stores. But, Hive queries are only compatible with Hive tables. Therefore, before writing a Hive script on either a RDBMS table or Cassandra column family, you should create virtual Hive tables based on the actual RDBMS or noSQL data stores.

When you run a Hive script, it refers to the RDBMS table structures or column families where the processed information should be stored, and defines its own virtual tables by wrapping the existing physical tables. This virtual table structure is similar to SQL tables defined in RDBMS. For example, when Hive creates a virtual table by wrapping a MySQL table, the Hive table gets a set of fields corresponding to each data field in the MySQL table. You should individually map all the data fields you want to define in the Hive table with the MySQL table. Unmapped fields will not be available and visible in the Hive table.

The Hive engine maintains Hive tables by keeping a metadata set related to each table or column family in the actual table. Therefore, updating or deleting the actual table or column family using a third-party software will not update the metadata kept in the virtual Hive table. We recommend you to perform these operations on an SQL table. There is no need to create an SQL database using a separate tool or software, because creating a Hive table with the Hive scripts given below will automatically create the required SQL tables for you.

Creating Hive tables for various data sources

BAM uses storage handlers to connect to various data sources from Hive. Currently, there are two main storage handlers in BAM to connect to Cassandra and relational data stores. They are as follows:

At the time Hive tables are created, you can specify data source connection parameters using the CREATE TABLE query as Hive SERDEPROPERTIES (for noSQL) or TBLPROPERTIES (for RDBMS). Let's take a look at a few examples of each.

Creating a Hive table for Cassandra

In noSQL databases like Cassandra, column families are logically similar to tables in RDBMS. A set of key-value pairs in NoSQL databases is similar to data fields in RDBMS. There can be an arbitrary number of keys in the rows of a single column family, but you should select a set of them before mapping to fields of the virtual Hive table. BAM uses Cassandra storage handler to connect to Cassandra databases from Hive. You can define Cassandra storage handlers using org.apache.hadoop.hive.cassandra.CassandraStorageHandler class in the STORED BY clause. Let's look at a few examples.

Example 1:This is a basic example query to create a virtual Hive table by the name ActivityDataTable corresponding to a physical Cassandra column family.

 

CREATE EXTERNAL TABLE IF NOT EXISTS ActivityDataTable
(messageID STRING, sentTimestamp BIGINT, activityID STRING, version STRING, soapHeader STRING, soapBody STRING, host STRING)
STORED BY 'org.apache.hadoop.hive.cassandra.CassandraStorageHandler'
WITH SERDEPROPERTIES (
"wso2.carbon.datasource.name" = "WSO2BAM_CASSANDRA_DATASOURCE",
"cassandra.partitioner" = "org.apache.cassandra.dht.RandomPartitioner",
"cassandra.cf.name" = "org_wso2_bam_activity_monitoring" ,
"cassandra.columns.mapping" =
":key, payload_timestamp, correlation_bam_activity_id, Version, payload_SOAPHeader, payload_SOAPBody, meta_host" );

Note the following regarding the above query:

  • The new handler org.apache.hadoop.hive.cassandra.CassandraStorageHandler is used instead of the JDBC handler class.
  • WITH SERDEPROPERTIES is used instead of TBLPROPERTIES command.
  • The Cassandra storage handler class takes the following parameters for its SerDe properties.
    • cassandra.host : Host names/IP Addresses of Cassandra nodes. You can use a comma-separated list for multiple nodes in the Cassandra ring for fail-over.
    • cassandra.port : The port through which Cassandra listens to client requests.
    • cassandra.ks.name : Cassandra Keyspace name. Keyspaces are logically similar to databases in RDBMS. The connection parameters, host, port, username and password are declared explicitly in cassandra.host, cassandra.port, cassandra.ks.username and cassandra.ks.password respectively. The name of the keyspace is EVENT_KS by default. To change this, edit the <keySpaceaName> element in <BAM_HOME>/repository/conf/ data-bridges/data-bridge- config.xml file.  
    • cassandra.ks.username : Username (username@tenant_domain if in Stratos) for authenticating Cassandra Keyspace. If no authentication is required to the Keyspace to be connected, you can skip this.
    • cassandra.ks.password : Password for authenticating the Cassandra Keyspace. If no authentication is required to the Keyspace to be connected, you can skip this.
    • cassandra.cf.name : Cassandra ColumnFamily name. In this example, org_wso2_bam_activity_monitoring is set as the column family name.
    • cassandra.columns.mapping : Used to map the Cassandra column family keys to the Hive table fields. Should be in the same order as the Hive field definitions in CREATE TABLE. So the Hive table fields messageID, sentTimestamp, activityID, version, soapHeader, soapBody and host are mapped to the column family keys (keys of key-value pairs) by the names :key, payload_timestamp, correlation_bam_activity_id, Version, payload_SOAPHeader, payload_SOAPBody and meta_host. The reason is because the column family is already created, and the Hive script only creates the mapped Hive table onto the existing column family. :key is the unique row key available for each row in the Cassandra column family. You should map this field with a Hive table field in every Hive script.

Example 2: The query below also maps a set of ColumnFamily columns to a Hive table. The row ColumnFamily is mapped to orderID. :key is a special keyword in the column mapping and it refers to ColumnFamily row key.

Hive_Schema = CREATE TABLE PhoneSalesTable (orderID STRING, brandName STRING, userName STRING, quantity INT, version STRING)
cassandra.columns.mapping = ":key,payload_brand, payload_user, payload_quantity, Version"

Example 3: This sample query maps static and variable columns in ColumnFamily to a Hive table. map: specifies that all columns in a Cassandra row that do not belong to the specified fixed fields in the column mapping should be taken as a property map with String key values. This works if mixed data types are present within the variable properties.

You can use any name that ends in a colon instead of the map: identifier. For example, you can use a value like propertyBag: instead of map: as long as the name ends with a colon.

Hive_Schema = CREATE TABLE PhoneSalesTable (orderID STRING, brandName STRING, userName STRING, quantity INT, version STRING, properties map<string,string>) 
cassandra.columns.mapping = ":key,payload_brand, payload_user, payload_quantity, Version, map:" );

Hive loads the entire row to memory for discovering the contents of the Map. This can cause out-of-memory issues in the Hive jobs. Therefore, we do not recommend using Map data type if the column family is based on wide rows.

Example 4: This sample query maps the entire ColumnFamily as key value pairs (Transposed table). It maps all column names to a field in the Hive table and corresponding column values to another. This is useful for time series data stored within Cassandra as wide rows.

Hive_Schema = CREATE TABLE PhoneSalesTimeSeriesTable (itemId STRING, quarter STRING, salesFigure INT)
cassandra.columns.mapping = ":key, :column, :value"

You can map a Cassandra super column family as follows. 

Hive_Schema = CREATE TABLE PhoneSalesTimeSeriesTable (itemId STRING, quarter STRING, month STRING, salesFigure INT)
cassandra.columns.mapping = ":key, :column, :subcolumn, :value"

Other than the Map data type, BAM does not support any other complex Hive data type with Cassandra Storage Handler at the moment.

If you are using a Cassandra version below 1.1.3, you need to add the following element in Hive queries you write: "cassandra.partitioner" = "org.apache.cassandra.dht.RandomPartitioner". An example Hive query with this element will be as follows.

CREATE EXTERNAL TABLE IF NOT EXISTS PhoneSalesTable 
    (orderID STRING, brandName STRING, userName STRING, quantity INT,
     version STRING) STORED BY 
    'org.apache.hadoop.hive.cassandra.CassandraStorageHandler' 
    WITH SERDEPROPERTIES (
     "wso2.carbon.datasource.name" = "WSO2BAM_CASSANDRA_DATASOURCE",
    "cassandra.partitioner" = "org.apache.cassandra.dht.RandomPartitioner",
    "cassandra.cf.name" = "org_wso2_bam_phone_retail_store_kpi" , 
    "cassandra.columns.mapping" = 
    ":key,payload_brand, payload_user, payload_quantity, Version" );


Using a datasource name for Cassandra storage handler in Hive scripts

Datasource names can also be used for Cassandra storage hander in Hive scripts. Predefined datasource named "WSO2BAM_CASSANDRA_DATASOURCE" is already there for this purpose. Modifications for "WSO2BAM_CASSANDRA_DATASOURCE" can be done by editing <BAM_HOME>/repository/conf/datasources/bam-datasources.xml file. Given below is the datasource configuration for Cassandra.

		<datasource>
            <name>WSO2BAM_CASSANDRA_DATASOURCE</name>
            <description>The datasource used for Cassandra data</description>
            <definition type="RDBMS">
                <configuration>
                    <url>jdbc:cassandra://localhost:9160/EVENT_KS</url>
                    <username>admin</username>
                    <password>admin</password>
                </configuration>
            </definition>
        </datasource>

Instead of using a single jdbc URL (i.e., jdbc:cassandra://localhost:9160/EVENT_KS), a set of jdbc URLs as a comma separated list can be used when the load balancing is required.

After configuring a datasource for cassandra, you can just define a peroperty named "wso2.carbon.datasource.name" in hive scripts so that the following properties are no longer required.
    1) cassandra.host
    2) cassandra.port
    3) cassandra.ks.name
    4) cassandra.ks.username
    5) cassandra.ks.password

Here is an example hive script which use datasource name as Cassandra storage handler

CREATE EXTERNAL TABLE IF NOT EXISTS ActivityDataTable
    (messageID STRING, sentTimestamp BIGINT, activityID STRING, version STRING, soapHeader STRING, soapBody STRING, host STRING)
    STORED BY 'org.apache.hadoop.hive.cassandra.CassandraStorageHandler'
    WITH SERDEPROPERTIES (
    "wso2.carbon.datasource.name" = "WSO2BAM_CASSANDRA_DATASOURCE",
    "cassandra.cf.name" = "org_wso2_bam_activity_monitoring" ,
    "cassandra.columns.mapping" =
    ":key, payload_timestamp, correlation_bam_activity_id, Version, payload_SOAPHeader, payload_SOAPBody, meta_host" );

Creating a Hive table for RDBMS

BAM uses JDBC storage handler to connect to relational databases from Hive. You can specify JDBC storage handlers using org.wso2.carbon.hadoop.hive.jdbc.storage.JDBCStorageHandler class in STORED BY clause.

Two methods are discussed below.

Method 1 : Define the data source in bam-datasources.xml file

Here is an example query for creating a virtual Hive table by the name ActivitySummaryTable corresponding to a physical H2 table by the name ActivitySummary. Note that you do not have to match the column names in the Hive table to those in H2. The query maps each column in Hive with a column in H2 table based on the order it is defined. You can find the below query in BAM Activity Monitoring Toolbox as well.

 

CREATE EXTERNAL TABLE IF NOT EXISTS ActivitySummaryTable(
    messageRowID STRING, sentTimestamp BIGINT, bamActivityID STRING, soapHeader STRING, soapBody STRING, host STRING)
    STORED BY 'org.wso2.carbon.hadoop.hive.jdbc.storage.JDBCStorageHandler'
    TBLPROPERTIES (
    'wso2.carbon.datasource.name'='WSO2BAM_DATASOURCE',
    'hive.jdbc.update.on.duplicate' = 'true' ,
    'hive.jdbc.primary.key.fields' = 'messageRowID' ,
    'hive.jdbc.table.create.query' =
    'CREATE TABLE ActivitySummary (messageRowID VARCHAR(100) NOT NULL PRIMARY KEY,
     sentTimestamp BIGINT, bamActivityID VARCHAR(40), soapHeader TEXT, soapBody TEXT, host VARCHAR(25))' );

Note the following regarding the above query:

  • CREATE EXTERNAL TABLE IF NOT EXISTS command ensures that the ActivitySummaryTable is created only once. This avoids the script being executed repeatedly, which can reduce performance.
  • The messageRowID, sentTimestamp, bamActivityID, soapHeader, soapBody and host parameters used in the Hive table are mapped to the real data fields messageRowID, sentTimestamp, bamActivityID, soapHeader, soapBody and host in the H2 database.
  • org.wso2.carbon.hadoop.hive.jdbc.storage.JDBCStorageHandler is the JDBC driver class location used for the H2 database connection. It comes by default with WSO2 BAM distribution, so you do not have to copy it manually.
  • When you create the Hive table with a RDBMS table, you should specify several database-related properties with TBLPROPERTIES. They are as follows:
    1. 'wso2.carbon.datasource.name' = 'WSO2BAM_DATASOURCE' : Data source name provides connection-related information such as username, password and port. WSO2BAM_DATASOURCE is an H2 data source that comes by default with BAM. You can define it in <BAM_HOME>/repository/conf/datasources/bam-datasources.xml file. Alternatively, you can also define the database connection parameters explicitly. We discuss the latter approach in Method 2 below.
    2. 'hive.jdbc.update.on.duplicate' = 'true' : Used to overwrite existing data rows in the table when it is updated with an entry that has the same primary key.
    3. 'hive.jdbc.primary.key.fields' = 'messageRowID' : Used to set primary key fields to the Hive table. In this example, only messageRowID is set as the primary key. But, you can set more than one primary key separated by comma. For example, 'hive.jdbc.primary.key.fields' = 'primarykey1, primarykey2, primarykey3'.
    4. 'hive.jdbc.table.create.query' = 'CREATE TABLE... : This is the only place where you should use real SQL commands that are run in the real RDBMS system. The SQL command to create the H2 database is : CREATE TABLE ActivitySummary (messageRowID VARCHAR(100) NOT NULL PRIMARY KEY, sentTimestamp BIGINT, bamActivityID VARCHAR(40), soapHeader TEXT, soapBody TEXT, host VARCHAR(25).

 The table below shows how the query maps H2 table fields with data types of the Hive table fields.

HiveH2
STRINGVARCHAR(100). The number of characters should be sufficiently large.
STRINGTEXT. This is specific to H2. Other database types can have different types for declaring Strings with unlimited length.
INTINT
SMALLINTSMALLINT
BIGINTBIGINT
DOUBLEDOUBLE

Method 2 : Defining the connection parameters inline in the Hive script

The previous section describes how to define a data source for the Hive table in bam-datasources.xml file. Method 2 here describes how to define the RDBMS connection parameters inline in the Hive scrip as properties.

The script given below is based on an H2 table. You can also find it in KPI Phone Retail Store sample of WSO2 BAM.

 

CREATE EXTERNAL TABLE IF NOT EXISTS UserTable(
    name STRING, totalOrders INT, totalQuantity INT) 
    STORED BY 'org.wso2.carbon.hadoop.hive.jdbc.storage.JDBCStorageHandler'
    TBLPROPERTIES ( 
    'mapred.jdbc.driver.class' = 'org.h2.Driver' , 
    'mapred.jdbc.url' = 'jdbc:h2:repository/database/samples/WSO2CARBON_DB;AUTO_SERVER=TRUE' , 
    'mapred.jdbc.username' = 'wso2carbon' , 
    'mapred.jdbc.password' = 'wso2carbon' , 
    'hive.jdbc.update.on.duplicate' = 'true' , 
    'hive.jdbc.primary.key.fields' = 'name' , 
    'hive.jdbc.table.create.query' = 
    'CREATE TABLE UserSummary (name VARCHAR(100) NOT NULL PRIMARY KEY,
     totalOrders  INT, totalQuantity INT)' );

Note the following regarding the above query.

  • 'mapred.jdbc.driver.class' = 'org.h2.Driver' : This is the JDBC driver class name required for the database connection. You should add the JDBC driver JAR file to <BAM_HOME>/repository/components/lib/ directory. The H2 JDBC driver comes by default with WSO2 BAM. DBC driver class name.
  • 'mapred.jdbc.url' = 'jdbc:h2:repository/database/samples/WSO2CARBON_DB;AUTO_SERVER=TRUE' : The JDBC connection URL to the database.
  • 'mapred.jdbc.username' = 'wso2carbon' : JDBC Username.
  • 'mapred.jdbc.password' = 'wso2carbon' : JDBC Password.
  • hive.jdbc.update.on.duplicate : Updates the record in the database on existing records with current records primary key if true.
  • hive.jdbc.primary.key.fields : Specifies the primary keys of database table.
  • hive.jdbc.table.create.query : You can provide here a create table query with database-specific syntax. Useful when the database has specific syntax for creating tables.

The names of the actual database table columns and Hive table fields should match in create table query. The Hive table name and actual database table name should also be matching.

  • wso2.carbon.datasource.name : Specifies a Carbon datasource for connecting to the database. When a data source is used, you do not have to specify the properties of the JDBC driver, JDBC URL, username and password. This method is recommended as hard-coded database credentials can be left out from the analytic scripts.

Now that we have explained how to create virtual Hive tables for actual RDBMS or noSQL databases, let's take a look at how to update and maintain these Hive tables.

Analyzing data using Hive tables

The purpose of creating virtual Hive tables by mapping actual tables is to execute Hive queries on them. You can create Hive tables on both RDBMS and noSQL tables. Hive internally maintains metadata about the real table it is mapped to.

After creating a Hive table, the next step is to insert and process the data You can write Hive queries to retrieve data stored in databases, process the data in virtual Hive tables (averaging, aggregating, summing up etc.) to produce meaningful information, and then transfer this information to either noSQL or RDBMS databases. BAM then retrieves this information and displays them through various dashboards. This is similar to executing SQL queries on a RDBMS table. Instead of the UPDATE command, Hive uses INSERT OVERWRITE command. Given below is a n example. It simply inserts data into a virtual Hive table.

 

insert overwrite table ActivitySummaryTable
    select messageID, sentTimestamp, activityID, soapHeader, soapBody, host
    from ActivityDataTable
    where version= "1.0.0";

Here's a more comprehensive example with 3 Hive commands. This query uses several aggregation functions to process and analyze data in virtual Hive tables. Some of them are as follows:

  • sum - Total of all records.
  • avg - Average of all records.
  • min - Minimum of records.
  • max - Maximum of records.

You can find more Hive functions here: https://cwiki.apache.org/Hive/languagemanual-udf.html.

CREATE EXTERNAL TABLE IF NOT EXISTS AppServerStats (key STRING, service_name STRING,operation_name STRING,
request_count INT,response_count INT,fault_count INT, response_time BIGINT,remote_address STRING,
payload_timestamp BIGINT,host STRING) STORED BY
'org.apache.hadoop.hive.cassandra.CassandraStorageHandler' WITH SERDEPROPERTIES ( "cassandra.host" = "127.0.0.1",
"cassandra.port" = "9160","cassandra.ks.name" = "EVENT_KS",
"cassandra.ks.username" = "admin","cassandra.ks.password" = "admin",
"cassandra.cf.name" = "bam_service_data_publisher",
"cassandra.columns.mapping" = ":key,payload_service_name,payload_operation_name,payload_request_count,payload_response_count,payload_fault_count,
 payload_response_time,meta_remote_address, payload_timestamp,meta_host" );                                    
 
CREATE EXTERNAL TABLE IF NOT EXISTS AppServerStatsPerMinute(host STRING, service_name STRING, operation_name STRING, total_request_count INT,total_response_count INT,
total_fault_count INT,avg_response_time DOUBLE,min_response_time BIGINT,max_response_time BIGINT, year SMALLINT,month SMALLINT,day SMALLINT,hour SMALLINT,minute SMALLINT, time STRING) 
STORED BY 'org.wso2.carbon.hadoop.hive.jdbc.storage.JDBCStorageHandler' TBLPROPERTIES ( 
'wso2.carbon.datasource.name'='WSO2BAM_DATASOURCE',
'hive.jdbc.update.on.duplicate' = 'true',
'hive.jdbc.primary.key.fields' = 'host,service_name,operation_name,year,month,day,hour,minute',
'hive.jdbc.table.create.query' = 'CREATE TABLE AS_STATS_SUMMARY_PER_MINUTE ( host VARCHAR(100) NOT NULL, 
service_name VARCHAR(150),operation_name VARCHAR(150), 
total_request_count INT,total_response_count INT,
total_fault_count INT,avg_response_time DOUBLE,min_response_time BIGINT,max_response_time BIGINT, year SMALLINT, month SMALLINT, day SMALLINT, hour SMALLINT, minute SMALLINT, time VARCHAR(30))' );
 
INSERT OVERWRITE TABLE AppServerStatsPerMinute select host,service_name,operation_name, sum(request_count) as total_request_count, sum(response_count) as total_response_count,sum(fault_count) as total_fault_count,avg(response_time) as avg_response_time,min(response_time) as min_response_time, max(response_time) as max_response_time, year(from_unixtime(cast(payload_timestamp/1000 as BIGINT),'yyyy-MM-dd HH:mm:ss.SSS' )) as year, month(from_unixtime(cast(payload_timestamp/1000 as BIGINT),'yyyy-MM-dd HH:mm:ss.SSS' )) as month,day(from_unixtime(cast(payload_timestamp/1000 as BIGINT),'yyyy-MM-dd HH:mm:ss.SSS' )) as day,hour(from_unixtime(cast(payload_timestamp/1000 as BIGINT),'yyyy-MM-dd HH:mm:ss.SSS' )) as hour, minute(from_unixtime(cast(payload_timestamp/1000 as BIGINT),'yyyy-MM-dd HH:mm:ss.SSS' )) as minute,concat(substring(from_unixtime(cast(payload_timestamp/1000 as BIGINT), 'yyyy-MM-dd HH:mm:ss'),0,16),':00') as time from AppServerStats group by year(from_unixtime(cast(payload_timestamp/1000 as BIGINT),'yyyy-MM-dd HH:mm:ss.SSS' )), month(from_unixtime(cast(payload_timestamp/1000 as BIGINT),'yyyy-MM-dd HH:mm:ss.SSS' )),day(from_unixtime(cast(payload_timestamp/1000 as BIGINT),'yyyy-MM-dd HH:mm:ss.SSS' )),hour(from_unixtime(cast(payload_timestamp/1000 as BIGINT),'yyyy-MM-dd HH:mm:ss.SSS' )),minute(from_unixtime(cast(payload_timestamp/1000 as BIGINT),'yyyy-MM-dd HH:mm:ss.SSS' )),substring(from_unixtime(cast(payload_timestamp/1000 as BIGINT), 'yyyy-MM-dd HH:mm:ss'),0,16), host,service_name,operation_name; 
com.atlassian.confluence.content.render.xhtml.migration.exceptions.UnknownMacroMigrationException: The macro 'next_previous_links2' is unknown.