This site contains the documentation that is relevant to older WSO2 product versions and offerings.
For the latest WSO2 documentation, visit https://wso2.com/documentation/.
Configuring the Broker Nodes
This section provides instructions on configuring the broker nodes. You need to configure the following files for clustering to work.
Important: The RDBMS identifies the latest record based on timestamps so a newer timestamp always gets preference. It is a must to have proper time syncing between Message Broker nodes in your cluster. Use the following commands for time syncing your server. Note that you need administrator permissions to do this.
$sudo apt-get install ntpdate;
$ntpdate pool.ntp.org
Configuring broker.xml
Open the
<MB_HOME>/repository/conf/broker.xml
file. This is the root configuration file of Message Broker. The changes made to this file must be done in all the WSO2 Message Broker nodes.Configure the slots for both broker nodes.
<slots> <!--maximum time interval where slot can be retain in memory before updating to the cluster, in milliseconds --> <slotRetainTimeInMemory>1000</slotRetainTimeInMemory> <!--rough estimate for size of a slot--> <windowSize>1000</windowSize> <!--Number of SlotDeliveryWorker threads that should be started--> <workerThreadCount>5</workerThreadCount> </slots>
See the following table for details on these configurations.
Configuration Description slotRetainTimeInMemory
This is the time value in milliseconds of how often the slot memory information is updated to the cluster. windowSize
A sub-element of the
<slots>
tag. WSO2 Message Broker assigns a slot to each subscriber when a subscriber arrives. Each subscriber reads messages from this assigned slot. A slot is a chunk of messages in a row.windowSize
is the rough estimate for the maximum number of messages in a slot. The default is 1000.workerThreadCount
workerThreadCount
is responsible for reading message metadata from the message store. This configuration is used to set the number of worker threads in MB. The default value is 5.Do thrift-related configurations. Thrift is used to maintain and sync slot (message groups) ranges between MB nodes. One of the MB nodes acts as the coordinator and runs as the thrift server. If this node goes down then the cluster assigns the role of a coordinator to one of the other remaining MB nodes.
<coordination> <nodeID>default</nodeID> <thriftServerHost>localhost</thriftServerHost> <thriftServerPort>7611</thriftServerPort> </coordination>
See the following table for details on these configurations.
Configuration Description coordination
This configuration is related to MB thrift communications.
nodeID
In a clustered deployment, an ID is assigned to each MB node via the cluster node identifier. This element can be used to override the cluster node identifier for this MB node. If the value for this element is left as default
, the default node ID is generated using the IP and a universally unique identifier (UUID). The node ID of each member in a cluster must be unique.thriftServerHost
This is a sub-element of the
<coordination>
tag. WSO2 MB uses Apache Thrift for communications relating to message delivery. Therefore, an Apache Thrift server is started in each MB node in a clustered deployment. This element should point to the IP address of the Apache Thrift server. This should point to the IP address of the MB node that hosts the thrift server. The default value for this islocalhost
. For example, if you are configuring a Message Broker node hosted in 192.168.0.102 as the thrift server, this value should be 192.168.0.102.thriftServerPort
This is another sub-element of the
<coordination>
tag. This should point to the port of the thrift server in MB. The default port is 7611. It is recommended to use this port for all broker nodes in your cluster.You must configure a DBMS for storage. See Configuring the DBMS for Storage for more information on how to do this. When configuring the DBMS, you must uncomment the relevant configuration for
messageStore
andcontextStore
in the broker.xml file. These configurations are based on the DBMS that you are connecting to. When doing this, you must make note of the following.Element Description messageStore
This is the class that is used to access an external RDBMS database to operate on messages. The following are the classes for their relevant DBMS.
Standard RDBMS like MySQL or MSSQL org.wso2.andes.store.rdbms.RDBMSMessageStoreImpl H2 in-memory mode DBMS org.wso2.andes.store.rdbms.h2.H2MemMessageStoreImpl Cassandra CQL-based DBMS org.wso2.andes.store.cassandra.cql.CQLBasedMessageStoreImpl Cassandra Hector-based DBMS org.wso2.andes.store.cassandra.hector.HectorBasedMessageStoreImpl contextStore
This is the class that is used to access an external RDBMS database to operate on server context like in the case of subscriptions.
Standard RDBMS like MySQL or MSSQL org.wso2.andes.store.rdbms.RDBMSAndesContextStoreImpl H2 in-memory mode DBMS org.wso2.andes.store.rdbms.h2.H2MemAndesContextStoreImpl Cassandra CQL-based DBMS org.wso2.andes.kernel.CassandraBasedAndesContextStore Cassandra Hector-based DBMS org.wso2.andes.store.cassandra.hector.HectorBasedAndesContextStoreImpl
Configuring the qpid-config.xml file
Enable heartbeat messaging for each of the broker nodes by following the steps given below. This is required for handling TCP connections between the broker nodes and client systems (publishers and subscribers).
What is heartbeat messaging?
When a client is connected to the broker, both the broker and the client should be able to detect problem situations where the TCP connection is half open or where the connecting client/broker is unresponsive. This can be achieved by enabling heartbeat messaging between the broker and the client.
When this configuration is enabled, both the broker and the connecting client will be able to check if the connection is active, and if the connecting system (broker or client) is active by periodically sending heartbeat messages to each other.
For example, you can set the heartbeat delay time to 30 seconds. This means that if the broker (or the client) does not receive the expected response from the other system within 30 seconds, it will send a heartbeat message to check if the connection is inactive or if the other system is inactive. If the connection is inactive or is half-open, there will be a clear indication of connection failure. However, if the connection is active but the other system is unresponsive, the broker or the client will continue to send heartbeat messages (every 30 seconds). By default, the broker and clients are configured to terminate (timeout) the connection after sending two heartbeat messages; however, this time out configuration can be changed.
Note: In a scenario where the subscriber client is sending the heartbeat message to the broker node, if the connection is found to be broken or if the broker node is inactive, the connection will failover to another broker node in the cluster.
- Open the qpid
-config.xml
file that is stored in the<MB_HOME>/repository/conf/advanced/
directory. Set the heartbeat delay as shown below. The recommended heartbeat delay is 30 seconds.
<heartbeat> <delay>30</delay> <timeoutFactor>2.0</timeoutFactor> </heartbeat>
The elements in the above configuration are explained below.
delay The <delay> element specifies the time interval between heartbeat messages. This is the time that the broker or client will wait to receive a response from the other party. If the response is not received within this time, a heartbeat message is triggered. timeoutFactor The number of heartbeat messages the broker will send to the client before terminating the connection. That is, if the timeoutFactor is 2, the broker will send heartbeat messages every 30 seconds twice, and if a response is not received from the client, the connection will be terminated.
Configuring master-datasources.xml
In this configuration file, you must configure the registry database.
- Open the
<MB_HOME>/repository/conf/datasources/master-datasources.xml
file. The changes made to this file must be done in both broker nodes. Uncomment or add the following configuration.
<datasource> <name>REGISTRY_DB</name> <description>The datasource used for registry- config/governance</description> <jndiConfig> <name>jdbc/WSO2RegistryDB</name> </jndiConfig> <definition type="RDBMS"> <configuration> <url>jdbc:mysql://carbondb.mysql-wso2.com:3306/REGISTRY_DB?autoReconnect=true</url> <username>regadmin</username> <password>regadmin</password> <driverClassName>com.mysql.jdbc.Driver</driverClassName> <maxActive>50</maxActive> <maxWait>60000</maxWait> <testOnBorrow>true</testOnBorrow> <validationQuery>SELECT 1</validationQuery> <validationInterval>30000</validationInterval> </configuration> </definition> </datasource>
We have used MySQL as an example here, but you can use any database for this and change the configurations accordingly. Make sure to replace
username
andpassword
with your MySQL database username and password. You must also configure the storage database here; see Configuring the DBMS for Storage for more information on how to do this.
Configuring registry.xml
In this configuration file, you must correctly mount the registry database.
- Open the
<MB_HOME>/repository/conf/registry.xml
file. The changes made to this file must be done in all the WSO2 Message Broker nodes. Uncomment or add the following mounting configurations.
<dbConfig name="sharedregistry"> <dataSource>jdbc/WSO2RegistryDB</dataSource> </dbConfig> <remoteInstance url="https://localhost:9443/registry"> <id>instanceid</id> <dbConfig>sharedregistry</dbConfig> <readOnly>false</readOnly> <enableCache>true</enableCache> <registryRoot>/</registryRoot> <cacheId>regadmin@jdbc:mysql://carbondb.mysql-wso2.com:3306/REGISTRY_DB?autoReconnect=true</cacheId> </remoteInstance> <mount path="/_system/config" overwrite="true"> <instanceId>instanceid</instanceId> <targetPath>/_system/config</targetPath> </mount> <mount path="/_system/governance" overwrite="true"> <instanceId>instanceid</instanceId> <targetPath>/_system/governance</targetPath> </mount>
When doing mounting configurations, make note of the following.
- The
dataSource
you specify under the<dbConfig name="sharedregistry">
tag must match thejndiConfig
name you specified in the master-datasources.xml file. - The registry
mount
path
is used to identify the type of registry. For example, ”/_system/config
” refers to configuration registry, and "/_system/governance
" refers to the governance registry. - The
dbconfig
entry enables you to identify the datasource you configured in themaster-datasources.xml
file. We use the unique namesharedregistry
to refer to that datasource entry. - The
remoteInstance
section refers to an external registry mount. We can specify the read-only/read-write nature of this instance as well as caching configurations and the registry root location. In case of a Message Broker node, thereadOnly
property must be set tofalse
as we require all broker nodes to have both read and write permissions. - Additionally, we must specify
cacheId
, which enables caching to function properly in the clustered environment. Note thatcacheId
is the same as the JDBC connection URL of the registry database. This value is thecacheId
of the remote instance. Here thecacheId
should be in the format of$database_username@$database_url
, where$database_username
is the username of the remote instance database and$database_url
is the remote instance database URL. ThiscacheID
is used to identify the cache it should look for when caching is enabled. In this case, the database we should connect to isREGISTRY_DB
, which is the database shared across all the broker nodes. You can identify that by looking in the mounting configurations, where the same datasource is being used. - You must define a unique name “id” for each remote instance, which is then referred to from mount configurations. In the above example, the unique ID for the remote instance is
instanceId
. In each of the mounting configurations, we specify the actual mount path and target mount path. The
targetPath
can be any meaningful name. In this instance, it is/_system/config
.
- The
Configuring axis2.xml
This section provides all the configurations related to enabling and correctly configuring clustering.
- Open the
<MB_HOME>/repository/conf/axis2/axis2.xml
file. The changes made to this file must be done in both broker nodes. Enable Hazelcast clustering by doing the following configuration.
<clustering class="org.wso2.carbon.core.clustering.hazelcast.HazelcastClusteringAgent" enable="true">
Specify the membership scheme that you plan to use for clustering. See Clustering Overview for more information on membership schemes.
<parameter name="membershipScheme">wka</parameter>
The
localMemberHost
of the server is "127.0.0.1
" by default. Change it to the IP address of the host where the MB server is in. This has to be done for each server. For example, in server 192.168.0.102, it looks as follows.<parameter name="localMemberHost">192.168.0.102</parameter>
- Select a port to communicate cluster-wide information among MB nodes. This is 4000 by default.
<parameter name="localMemberPort">4000</parameter>
When using the “
wka
” membership scheme, each member of the cluster should be configured with the information about other cluster members. In this case, the other broker node must be defined here. For example, if the broker node you are configuring is on 192.168.0.102, you must configure the other broker node, which is hosted on 192.168.0.103, as indicated below.<members> <member> <hostName>192.168.0.103</hostName> <port>4000</port> </member> </members>
Add the following custom serializers under the
<clustering>
element. These serializers allow nodes to interpret the information that is replicated via Hazelcast.<parameter name="hazelcastSerializers"> <serializer typeClass="org.wso2.andes.server.cluster.coordination.hazelcast.custom.serializer.wrapper.TreeSetLongWrapper">org.wso2.andes.server.cluster.coordination.hazelcast.custom.serializer.TreeSetLongWrapperSerializer</serializer> <serializer typeClass="org.wso2.andes.server.cluster.coordination.hazelcast.custom.serializer.wrapper.TreeSetSlotWrapper">org.wso2.andes.server.cluster.coordination.hazelcast.custom.serializer.TreeSetSlotWrapperSerializer</serializer> <serializer typeClass="org.wso2.andes.server.cluster.coordination.hazelcast.custom.serializer.wrapper.HashmapStringTreeSetWrapper">org.wso2.andes.server.cluster.coordination.hazelcast.custom.serializer.HashMapStringTreeSetWrapperSerializer</serializer> </parameter>
Note that you also have to comment out any other existing serializers since the serializers available by default in the
axis2.xml
file cannot be used for this purpose.