Hive Query Language
Let's take a look at some high-level details of the Hive Query Language (HQL). HQL is very similar to SQL. Following references are good starting points:
- Hive Language Manual (https://cwiki.apache.org/confluence/display/Hive/LanguageManual)
- Hive Tutorial (https://cwiki.apache.org/Hive/tutorial.html)
Hive Storage Handlers
Hive uses Storage Handlers to connect to various data sources. Currently, two main Storage Handlers are used to connect to BAM Cassandra database and various relational data stores. Data source connection parameters are provided at table-creation time within the 'CREATE TABLE' query as Hive SERDEPROPERTIES or TBLPROPERTIES. This is shown in the two examples below.
CREATE EXTERNAL TABLE IF NOT EXISTS PhoneSalesTable (orderID STRING, brandName STRING, userName STRING, quantity INT, version STRING) STORED BY 'org.apache.hadoop.hive.cassandra.CassandraStorageHandler' WITH SERDEPROPERTIES ( "cassandra.host" = "127.0.0.1" , "cassandra.port" = "9160" , "cassandra.ks.name" = "EVENT_KS" , "cassandra.ks.username" = "admin" , "cassandra.ks.password" = "admin" , "cassandra.cf.name" = "org_wso2_bam_phone_retail_store_kpi" , "cassandra.columns.mapping" = ":key,payload_brand, payload_user, payload_quantity, Version" );
CREATE EXTERNAL TABLE IF NOT EXISTS PhonebrandTable(brand STRING, totalOrders INT, totalQuantity INT) STORED BY 'org.wso2.carbon.hadoop.hive.jdbc.storage.JDBCStorageHandler' TBLPROPERTIES ( 'mapred.jdbc.driver.class' = 'org.h2.Driver' , 'mapred.jdbc.url' = 'jdbc:h2:repository/database/samples/WSO2CARBON_DB;AUTO_SERVER=TRUE' , 'mapred.jdbc.username' = 'wso2carbon' , 'mapred.jdbc.password' = 'wso2carbon' , 'hive.jdbc.update.on.duplicate' = 'true' , 'hive.jdbc.primary.key.fields' = 'brand' , 'hive.jdbc.table.create.query' = 'CREATE TABLE brandSummary (brand VARCHAR(100) NOT NULL PRIMARY KEY, totalOrders INT, totalQuantity INT)' );
Cassandra Storage Handler
Cassandra Storage Handler is specified using 'org.apache.hadoop.hive.cassandra.CassandraStorageHandler' class in the STORED BY clause. It takes following parameters for it's SerDe properties.
- cassandra.host - Host names/IP Addresses of Cassandra nodes. You can use a comma separated list for specifying multiple nodes in the Cassandra ring for fail-over.
- cassandra.port - The port through which Cassandra listens to client requests.
- cassandra.ks.name - Cassandra Keyspace name.
- cassandra.ks.username - Username (username@tenant_domain if in Stratos) for authenticating Cassandra Keyspace. This is optional if no authentication is required to the Keyspace to be connected.
- cassandra.ks.password -Â Password for authenticating with Cassandra Keyspace. This is optional if no authentication is required to the Keyspace to be connected.
- cassandra.cf.name - Cassandra ColumnFamily name
- cassandra.columns.mapping - Mapping between Cassandra columns and Hive table fields. Needs to be in same order as Hive field definitions in CREATE TABLE.
Example 1: Mapping a set of ColumnFamily columns to Hive table. ColumnFamily row key is mapped to orderID. ":key" is a special keyword within column mapping referring to ColumnFamily row key.
Hive_Schema = CREATE TABLE PhoneSalesTable (orderID STRING, brandName STRING, userName STRING, quantity INT, version STRING) cassandra.columns.mapping = ":key,payload_brand, payload_user, payload_quantity, Version"
Example 2:Â Mapping a set of static columns and a variable set of columns in ColumnFamily to Hive table. "map:" specifies that all columns in a Cassandra row not belonging to the specified fixed fields in the column mapping should be taken as a property map with string key values. This works if mixed data types are present within the variable properties.
The "map:" identifier can be anything that ends in a colon. For example, a value like "propertyBag:" can be used instead of "map:".
Hive_Schema = CREATE TABLE PhoneSalesTable (orderID STRING, brandName STRING, userName STRING, quantity INT, version STRING, properties map<string,string>)Â cassandra.columns.mapping = ":key,payload_brand, payload_user, payload_quantity, Version, map:" );
Hive loads the entire row to memory for discovering the contents of the Map. As this might cause out of memory errors within Hive jobs, it is not recommended to use Map data type if the column family is based on wide rows.
Example 3:Â Mapping the whole ColumnFamily as key value pairs (Transposed table). In this example, we map all column names to a field in Hive table and corresponding column values to another. This is useful for time series data stored within Cassandra as wide rows.
Hive_Schema = CREATE TABLE PhoneSalesTimeSeriesTable (itemId STRING, quarter STRING, salesFigure INT) cassandra.columns.mapping = ":key, :column, :value"
You can map a Cassandra super column family as follows.Â
Hive_Schema = CREATE TABLE PhoneSalesTimeSeriesTable (itemId STRING, quarter STRING, month STRING, salesFigure INT) cassandra.columns.mapping = ":key, :column, :subcolumn, :value"
Currently no complex Hive data types are supported with Cassandra Storage Handler other than the map data type.
Â
JDBC Storage Handler
JDBC Storage Handler is used to connect to relational databases from Hive. You can specify JDBC Storage Handler using 'org.wso2.carbon.hadoop.hive.jdbc.storage.JDBCStorageHandler' class in STORED BY clause. JDBC Storage Handler takes following parameters for its table properties.
- mapred.jdbc.driver.class - JDBC driver class name. Driver jar should be present in Carbon Classpath.
- mapred.jdbc.url - JDBC connection URL.
- mapred.jdbc.username - JDBC username.
- mapred.jdbc.password - JDBC password.
- hive.jdbc.update.on.duplicate - Updates the record in the database on already existing records with current records primary key if true.
- hive.jdbc.primary.key.fields - Specifies the primary keys of database table.
- hive.jdbc.table.create.query - If the database has specific syntax for creating tables, a 'create table' query with database-specific syntax can be provided here.
The names of the actual database table columns and Hive table fields should match in 'create table' query.
- wso2.carbon.datasource.name - Specifies a Carbon Datasource for connecting to the database. When a data source is used, you do not have to specify the properties of JDBC driver, JDBC URL, username and password. This method is recommended as hard-coded database credentials can be left out from the analytic scripts.
Extensions
In addition to normal Hive queries, an extension is added to analytic scripts so that you can execute any arbitrary Java code as a part of script execution. This is useful to parameterize Hive scripts during each Hive query execution run.
Extend the extension class from AbstractHiveAnalyzer found in WSO2 SVN. In order to write an extension, add the following under maven dependencies.
The interface of AbstractHiveAnalyzer is shown below.
Within the abstract method implementation, you can use inherited property methods to set parameters to Hive Configuration used during the particular Hive script execution run. For example, if a property named "lastRunTime" is set using setProperty method, it will be available in subsequent Hive queries within the script with notation {hiveconf:lasRunTime}. You can dynamically populate this "lastRunTime" property within each run of the script by looking up from the registry, database etc. This is useful in maintaining state between two executions of a Hive script and using them subsequently.
In order to use the custom Java implementation, drop the jar containing the class to <BAM_HOME>/repository/components/lib and restart the server if already running.
The following syntax will help in using the implementation.
class 'Fully qualified class name of the implementation';
Generally, you can include this line of code in any place of the script according to your requirement. For example, if you want some pre-processing done, add it at the beginning of the script. If you want some post-processing to be done after normal Hive queries, add it at the end.