Minimum High Availability (HA) Deployment
When streaming events with WSO2 Stream Processor (SP), there is a possibility of the node running the SP instance failing due to several unforeseeable reasons. This leads to a loss in the events being streamed until the node can be restarted. A solution for this is to use the WSO2 SP in a High Availability (HA) environment where the processing of events is not halted at an unexpected failover scenario. The recommended HA deployment for WSO2 SP is the two node minimum HA deployment, where two instances of WSO2 SP are running in parallel as depicted in the diagram below.
In this minimum HA setup, one node is assigned as the active node while the other node is assigned as the passive node. Both nodes will process the incoming events but only the active node will be used to publish the outgoing events. In a failover scenario of the active node, the passive node will activate and start publishing events from where the active node left off. Restarting of the terminated node will make it operate in the passive state by syncing with the active node to maintain the HA status.
In order for the 2 node HA to work, both active and passive node must receive the same (duplicate) events. To achieve this you can follow one of the strategies listed below:
- In each node, you deploy a Siddhi application with a distributed sink only to deliver duplicated events, and another Siddhi application to process and publish events.
- If WSO2 products are used, WSO2 data bridge can publish events to both nodes
- If you use a message broker such as Kafka, both nodes can subscribe to the same topic to receive duplicate events.
Prerequisites
In order to configure a minimum HA cluster, the following prerequisites must be completed:
- To run this set up, you need a CPU with four cores, and a total memory of 4GB.
- Two binary packs of WSO2 SP must be available.
A working RDBMS instance to be used for clustering of the 2 nodes.
- The datasource to be shared by the nodes in the cluster must be already defined in the
<SP_HOME>/conf/worker/deployment.yamlfile. - Download the MySQL connector from here. Extract and find the
mysql-connector-java-5.*.*-bin.jar.Drop the jar to the<SP_HOME>/libdirectory in both nodes. - In order to retrieve the state of the Siddhi Applications deployed in the system in case of a scenario where both the nodes fail, state persistence must be enabled for both nodes by specifying the same datasource/file location. For detailed instructions, see Configuring Database and File System State Persistence.
- A Siddhi client must be available to publish events to both the nodes in a synchronized manner where publishing of events is not stopped when one receiver node goes down.
Configuring a minimum HA cluster
To configure a minimum HA cluster, follow the steps below:
- Note that the following configurations need to be done in the
<SP_HOME>/conf/worker/deployment.yamlfile for both the WSO2 SP nodes in the cluster. - If you need to run both SP instances in the same host, make sure that you do a port offset to change the default ports in one of the hosts. For more information about the default ports, see Configuring Default Ports.
- For each node, enter a unique ID for the
idproperty under thewso2.carbonsection. (e.g.,id: wso2-sp). This is used to identify each node within a cluster. To allow the two nodes in the cluster to coordinate effectively, configure carbon coordination by updating the
cluster.configsection of the<SP_HOME>/conf/worker/deployment.yamlas follows:- To enable the cluster mode, set the
enabledproperty totrue. - In order to cluster the two nodes together, enter the same ID as the group ID for both nodes (e.g.,
groupId: group-1). - Enter the ID of the class that defines the coordination strategy for the cluster as shown in the example below.
e.g.,coordinationStrategyClass: org.wso2.carbon.cluster.coordinator.rdbms.RDBMSCoordinationStrategy - In the
strategyConfigsection, enter information as follows:- For clustering of the two nodes to take place
- Enter the name of the configured datasource shared by the nodes in the cluster as shown in the example below. Data handled by the cluster are persisted here.
datasource: WSO2_CLUSTER_DB(A datasource with this name should be configured)Following is a sample datasource configuration for a MySQL datasource that should appear under the
dataSourcessection of thewso2.datasourcessection in the<SP_HOME>/conf/worker/deployment.yaml .For detailed instructions of how to configure a datasource, see Configuring Datasources.Sample MySQL datasource- name: WSO2_CLUSTER_DB description: The MySQL datasource used for Cluster Coordination jndiConfig: name: jdbc/WSO2ClusterDB definition: type: RDBMS configuration: jdbcUrl: 'jdbc:mysql://localhost:3306/WSO2_CLUSTER_DB?useSSL=false' username: root password: root driverClassName: com.mysql.jdbc.Driver maxPoolSize: 50 idleTimeout: 60000 connectionTestQuery: SELECT 1 validationTimeout: 30000 isAutoCommit: false - Specify the time interval (in milliseconds) at which heartbeat pulse should occur for each node to indicate that it is in an active state as shown in the example below.
heartbeatInterval: 1000 - Specify the number of times the heartbeat pulse should be unavailable at the specified time interval in order to consider a node inactive as shown in the example below. A value of two means that if a node fails to send two consecutive heart beat pulses, it must be identified as inactive and removed from the cluster as a result.
heartbeatMaxRetry: 2 - Specify the time interval (in milliseconds) at which each node should listen for changes that occur in the cluster as shown in the example below.
eventPollingInterval: 1000
- To enable the cluster mode, set the
- Next add the
deployment.configsection to the<SP_HOME>/conf/worker/deployment.yamlfile with following configurations:- To enable 2 node minimum HA, set the
typeproperty tohaas shown below.type: ha - Add a section named
liveSyncand enter the following information under it:- Set the
enabledparameter totrueto enable direct communication between the two nodes via REST API calls. This ensures that the communication between the nodes is instantanious. To specify the address to which the HTTP requests by the external client must be directed, enter the relevant host and port in the
advertisedHostandadvertisedPortparameters respectively as shown in the example below.advertisedHost: localhostadvertisedPort: 9090These are optional parameters that you need to configure only in scenarios where the host and/or port is different from the advertised host and/or port (e.g., container based scenarios).
Listener ports are configured under the
listenerConfigurationssection of thewso2.transport.httpnamespace in the<SP_HOME>/conf/worker/deployment.yamlfile. Note that in a Docker environment the host and port that is mapped to these listener configurations should be given in theliveSyncsection.
To enable direct communication between the nodes the user credientials of the other node should be given as follows:
username: <username> # if this is node 1's configs enter node 2's usernamepassword: <password> # if this is node 1's configs enter node 2's passwordThe default
usernameset for a node isadminwith passwordadmin
- Set the
Specify the time interval (in milliseconds) at which the passive node must call the active node to update the last published events as shown in the example below.
outputSyncInterval: 60000
This ensures that events are not lost if a failover takes place and the currently passive node needs to start publishing events. If liveSync is enabled, these calls are made via REST API. If liveSync is disabled the database is used to communicate the information. When the time interval specified is shorter, the passive node is updated more frequently, and as a result, the number of events queued as well as the number of duplicates published are reduced.Specify the time interval (in milliseconds) within which the passive node should synchronize its state with the active node as shown in the example given below.
stateSyncGracePeriod: 120000If liveSync is disabled, the state is synchronized using the persisted state. For this purpose, a value must be specified for the
intervalInMinparameter in thestate.persistencesection to enable periodic state persistence.Specify the maximum number of events that can be queued in the sinks of the passive node as shown in the example below.
sinkQueueCapacity: 20000When events are queued in the sinks of the passive node, it ensures that those events are published at least once.
Specify the maximum number of events that can be queued in the sources of the passive node as shown in the example below.
sourceQueueCapacity: 20000
The purpose of queueing events at sources is to ensure no events are dropped during state synchronization.Specify the time (in milliseconds) given to the passive node to retry a failed state synchronization for each Siddhi application being deployed as shown in the example below.
retryAppSyncPeriod: 60000An application is not deployed unless the active node provides a valid state for that application.
deployment.config: type: ha liveSync: enabled: true advertisedHost: localhost advertisedPort: 9090 username: admin password: admin outputSyncInterval: 60000 stateSyncGracePeriod: 120000 sinkQueueCapacity: 20000 sourceQueueCapacity: 20000 retryAppSyncPeriod: 60000
- To enable 2 node minimum HA, set the
Publishing events to the cluster
For your SP minimum HA deployment to function, both active and passive node must receive the same (duplicate) events. To achieve this you can follow one of the strategies listed below:
- In each node, you deploy a Siddhi application with a distributed sink only to deliver duplicated events, and another Siddhi application to process and publish events.
- If WSO2 products are used, WSO2 data bridge can publish events to both nodes
- If you use a message broker such as Kafka, both nodes can subscribe to the same topic to receive duplicate events.
Publishing to both nodes via a distributed sink
In a minimum HA cluster with two nodes that uses sources such as HTTP, the nodes an be set up with distributed sinks as shown below.
In the above setup, the following Siddhi applications are deployed in each node:
Distributor.siddhi:
The single source to which the distributor listens can be an HTTP source, a JMS queue etc. that only sends one event to one consumer. In such sources, an event is removed from the source once it is sent to one consumer. As a result, another consumer cannot receive the same data. To address this, a distributor Siddhi application functions as the single consumer to receive a specific event and then resend it to multiple other sources.A distributor Siddhi application contains a distributed sink configuration. Events are resent to multiple other sources via this sink. These sinks are configured with the
@distributionannotation.- Executor.siddhi: The purpose of this Siddhi application is to publish processed events.
As illustrated in the above diagram, the distributor Siddhi as distributes events to two other sources. However, because passive nodes do not publish events, the distributor in the passive node directs the events it consumes to the buffer. This results in the events received by the distributor of the passive node being lost. To avoid this you can enable force publishing for the distributed sink by setting the forcePublish property to true.
The following is a sample distributed sink configuration with force publishing enabled.
The forcePublish parameter can be used only in a distributed sink configured in a minimum HA cluster.
@sink(type='tcp',
sync='true',
@map(type='json'),
@distribution(
forcePublish= 'true',
strategy='broadcast',
@destination(url='tcp://0.0.0.0:5511/sourceStreamOne'),
@destination(url='tcp://0.0.0.0:5512/sourceStreamOne')
))
define stream DistributionStream (name string, amount double);
When force publishing is enabled for the distributor Siddhi application of the passive node, events received by each distributor Siddhi appliction are resent to the two other sources. This ensures that each node receives all the events sent to the cluster.
Starting the cluster
Save the required Siddhi applications in the
<SP_HOME>/deployment/siddhi-filesdirectory in both nodes. In order to ensure that the Siddhi applications are completely synchronized between the active and the passive node, they must be added to thesiddhi-filesdirectory before the server startup. However, the synchronization can take place effectively even if the Siddhi applications are added while the server is running.In deploying Siddhi applications in a two node minimum HA cluster, it is recommended to use a content synchronization mechanism since Siddhi applications must be deployed to both worker nodes. You can use a common shared file system such as Network File System (NFS) or any other shared file system that is available. You need to mount the<SP_HOME>/deployment/siddhi-filesdirectory of the two nodes to the shared file system.Start both servers by navigating to
<SP_HOME>/binand issuing the following command:For Windows:worker.bat
For Linux :./worker.shTo start two WSO2 SP Nodes in the same machine, the
listenerConfigurationsunder thewso2.transport.httpnamespace in the<SP_HOME>/conf/worker/deployment.yamlfile must be updated to listen to different ports and corresponding values changed in the liveSync section. Theoffsetproperty under theportssection of thewso2.carbonsection found in<SP_HOME>/conf/worker/deployment.yamlshould also be changed in one SP instance to avoid conflicts when starting both servers.If the cluster is correctly configured, the following CLI logs can be viewed without any error logs:
In the active node:[2017-10-23 08:56:57,380] INFO {org.wso2.carbon.stream.processor.core.internal.ServiceComponent} - WSO2 Stream Processor Starting in Two Node Minimum HA Deployment [2017-10-23 08:56:57,392] INFO {org.wso2.carbon.stream.processor.core.ha.HAManager} - HA Deployment: Starting up as Active NodeIn the passive node:
[2017-10-23 08:57:39,775] INFO {org.wso2.carbon.stream.processor.core.internal.ServiceComponent} - WSO2 Stream Processor Starting in Two Node Minimum HA Deployment [2017-10-23 08:57:39,787] INFO {org.wso2.carbon.stream.processor.core.ha.HAManager} - HA Deployment: Starting up as Passive Node

