Partially-Distributed, Highly-Available BAM Setup
Shown in the diagram below is an example deployment of a high-availability BAM setup without distributed Hadoop. In this diagram, several WSO2 ESBs work as data agents, passing data to the BAM setup for analysis and summarization. For demonstration purposes, we have used three BAM nodes in this setup, but you can extend it to add as many nodes as needed.
WSO2 BAM has three main components as data gathering, analysis and presentation. Each component is explained in detail in About BAM. We can take each of these components in the three BAM nodes and deploy them in separate clusters as shown in the diagram below:
Figure1: BAM distributed, clustered setup
This setup persists and processes data in a distributed manner achieving high scalability and high availability. However, it is a partially-distributed setup where data collection, presentation layers and Hive script execution are not distributed. For a more distributed setup, see Fully-Distributed, High-Availability BAM Setup.
Given below is an explanation of each component in the above diagram.
Data Receivers: This includes data agents such as WSO2 Application Server (or any other service-hosting product) or WSO2 ESB capturing and transferring data to subscribed storage units. The default storage unit is Cassandra that comes with WSO2 BAM. You can set up a Cassandra cluster as well. Â
This setup does not have data receiver components deployed in a distributed manner. Instead, each node has its own standalone data receiver, capturing data from agents and storing in the Cassandra cluster. If one node fails, data agents will not be able to send data to the storage unit through that receiver.
Cassandra Cluster: Data that comes to BAM through data receivers is usually stored in the default Cassandra database. Figure1 above shows how the Cassandra databases of all three BAM nodes are deployed in a cluster. This ensures that even if one node fails, data can be received and stored in other databases in the cluster, and also ensures high availability of data to run the Hive scripts on.
Data Analyzer Cluster: Data analyzer component of each BAM node uses Hive query language scripts to retrieve data from the Cassandra cluster, process the data into meaningful information, and save the information in an RDBMS. In this example, we use MySQL as the RDBMS. You get an H2 database with WSO2 BAM by default but it is not recommended in a high volume, production setting. The Analyzer components in node2 and node3 are clustered in this setup. It uses Apache Hadoop to distribute processing to multiple nodes that run in parallel. The Analyzer component in node1 is not used for data processing in this particular example.
Although BAM uses Apache Hadoop's MapReduce technology underneath, you do not have to write complex Hadoop jobs to process data. BAM decouples you from this underlying complexities and enables you to write data processing queries in SQL-like Hive query language. Hive provides you the right level of abstraction from Hadoop while internally submitting the analytic jobs to Hadoop. It spawns a Hadoop JVM internally or delegate to a Hadoop cluster.
Zookeeper Cluster: Apache Zookeeper manages coordination required by the nodes in the Analyzer cluster when running Hive scripts. Zookeeper can run separately or embedded within BAM. In this example setup, we have clustered three Zookeeper instances running on each node.
Follow the steps below to configure this deployment.
Configuring data analyzer cluster
The data analyzer cluster uses the Registry to store metadata related to Hive scripts and scheduled tasks. It uses Apache Zookeeper (version 3.3.6 in this example) to handle coordination required by the nodes in the Analyzer cluster when running Hive scripts. These settings ensure high availability using a failover mechanism so that if one node fails, the rest can take up its load and complete the task. Â The diagram below depicts this setup:
Figure3: BAM data analyzer cluster
Let's see how to configure the analyzer cluster with node2 and node3.Â
- Download and extract WSO2 BAM to node2 and node3. These BAM nodes work as a cluster for executing and load-balancing Hive scripts.
Configuring registry sharing
- Place mysql connector jar inside
<BAM_HOME>/repository/components/lib
folder. Add the following datasource configuration in
<BAM_HOME>/repository/conf/datasources/
master-datasources.xml
file. Be sure to change the database URL and credentials according to your environment. TheWSO2_REG_DB
database is used in this example by the shared registry.
<datasource> <name>WSO2_REG_DB</name> <description>The datasource used for config</description> <jndiConfig> <name>jdbc/WSO2RegDB</name> </jndiConfig> <definition type="RDBMS"> <configuration> <url>jdbc:mysql://[host]:[port]/[reg-db]</url> <username>reg_user</username> <password>password</password> <driverClassName>com.mysql.jdbc.Driver</driverClassName> <maxActive>50</maxActive> <maxWait>60000</maxWait> <testOnBorrow>true</testOnBorrow> <validationQuery>SELECT 1</validationQuery> <validationInterval>30000</validationInterval> </configuration> </definition> </datasource>
- Add the following to
<BAM_HOME>/repository/conf/registry.xml
file.Â
<dbConfig name="wso2GReg"> <dataSource>jdbc/WSO2RegDB</dataSource> </dbConfig> <remoteInstance url="https://localhost:9443/registry"> <id>registryInstance</id> <dbConfig>wso2GReg</dbConfig> <readOnly>false</readOnly> <registryRoot>/</registryRoot> </remoteInstance> <mount path="/_system/config" overwrite="true"> <instanceId>registryInstance</instanceId> <targetPath>/_system/config</targetPath> </mount> <mount path="/_system/governance" overwrite="true"> <instanceId>registryInstance</instanceId> <targetPath>/_system/governance</targetPath> </mount>
- ExecuteÂ
<BAM_HOME>/dbscripts/
script asmysql.sql
reg_user
in MySQLreg-db
database. It creates the Registry schema for you.Configuring Zookeeper ensemble
- Download and extract Zookeeper to a preferred location in node1. This location is referred to as <ZOO_HOME> throughout this section. Download URL for version 3.3.6 is http://apache.osuosl.org/zookeeper/zookeeper-3.3.6/zookeeper-3.3.6.tar.gz. Â
Create a file named
zoo.cfg
inside<ZOO_HOME>/conf
and add the three nodes as follows.tickTime=2000 dataDir=$ZOO_HOME/data clientPort=2181 tickTime=2000 initLimit=5 syncLimit=2 server.1=node1:2888:3888 server.2=node2:2888:3888 server.3=node3:2888:3888
- Create a new directory called
data
under<ZOO_HOME>
to hold Zookeeper data. - Add a file with name
myid
containing the server number in dataDir. For example, for node1, put 1 insideÂmyid
file. - Follow steps 6 to 9 for node 2 and node 3 as well. Â
- Go to
<ZOO_HOME>/bin
and execute the following command to start the Zookeeper daemon in each of the nodes:sh zkServer.sh start
Configuring BAM analyzer nodes Add the following to
<BAM_HOME>/repository/conf/etc/coordination-client-config.xml
file of node2. It specifies the two BAM analyzer nodes in this setup with their ports.<CoordinationClientConfiguration enabled="true"> <Servers> <Server host="node1" port="2181"/> <Server host="node2" port="2181"/> <Server host="node3" port="2181"/> Â </Servers> <SessionTimeout>5000</SessionTimeout> </CoordinationClientConfiguration>
Add the following to
<
BAM_HOME>/repository/conf/etc/t
asks-config.xml
file in the Analyzer nodes. Â<taskServerMode>CLUSTERED</taskServerMode> <taskServerCount>1</taskServerCount>
- Add the mysql connector jar to
<BAM_HOME>/repository/components/lib
directory of node2. Add the following to
WSO2BAM_DATASOURCE
in<
BAM_HOME>/repository/conf/datasources/
master-datasources.xml
file of node2. Be sure to change the database URL and  credentials according to your environment.WSO2BAM_DATASOURCE
is the default data source available in BAM and it should be connected with the database you are using. This example usesÂbam-db
database to store BAM summary data.
<datasource> <name>WSO2BAM_DATASOURCE</name> <description>The datasource used for registry and user manager</description> <jndiConfig> <name>jdbc/WSO2BamDatasource</name> </jndiConfig> <definition type="RDBMS"> <configuration> <url>jdbc:mysql://[host]:[port]/[bam-db]</url> <username>bam_user</username> <password>password</password> <driverClassName>com.mysql.jdbc.Driver</driverClassName> <maxActive>50</maxActive> <maxWait>60000</maxWait> <testOnBorrow>true</testOnBorrow> <validationQuery>SELECT 1</validationQuery> <validationInterval>30000</validationInterval> </configuration> </definition> </datasource>
- Repeat BAM analyzer node configurations in node3 as well.
- Start the BAM server in node3 and remove BAM Tool Box Deployer feature using feature manager. We remove the feature because having deployers in both Analyzer BAM nodes interferes with proper Hive task fail-over functionality.
Configuring the Cassandra cluster
Before you start, increase the heap memory size of BAM nodes to at least 2 GB and sync times in all nodes.
Add the following configurations to
<BAM_HOME>/repository/conf/etc/cassandra.yaml
file in the nodes mentioned below.
To node1:cluster_name: Test Cluster initial_token: 0 seed_provider: - seeds: "node1" listen_address: node1 rpc_address: node1 rpc_port: 9160
to node2:
cluster_name: Test Cluster initial_token: 56713727820156410577229101238628035242 seed_provider: - seeds: "node1" listen_address: node2 rpc_address: node2 rpc_port: 9160
to node3:
cluster_name: Test Cluster initial_token: 113427455640312821154458202477256070485 seed_provider: - seeds: "node1" listen_address: node3 rpc_address: node3 rpc_port: 9160
You can generate tokens for the nodes using the script available in http://www.datastax.com/docs/0.8/install/cluster_init#calculating-tokens-for-a-single-data-center.
Change the
cassandra-component.xml
file in all nodes as follows. This connects the nodes to Cassandra endpoints.<Cassandra> <Cluster> <Name>Test Cluster</Name> <Nodes>node1:9160,node2:9160,nodd3:9160</Nodes> <DefaultPort>9160</DefaultPort> <AutoDiscovery disable="false" delay="1000"/> </Cluster> </Cassandra>
Edit the <BAM_HOME>/repository/conf/advanced/streamdefn.xml file in all nodes as follows. This changes replication factor and read/write consistency levels using which data receivers write data to Cassandra.
<StreamDefinition> <ReplicationFactor>3</ReplicationFactor> <ReadConsistencyLevel>QUORUM</ReadConsistencyLevel> <WriteConsistencyLevel>ONE</WriteConsistencyLevel> <StrategyClass>org.apache.cassandra.locator.SimpleStrategy</StrategyClass> </StreamDefinition>