Fully Distributed Deployment
Introduction
The most common deployment pattern for WSO2 SP is the Minimum High Availability Deployment that offers high availability with the minimum amount of resources. However, there are a few user scenarios where the HA (High Availability) deployment is not sufficient to handle the throughput.
The Distributed Deployment pattern is supported so that a high volume of data can be distributed among multiple SP instances instead of having them accumulated at a single point. It is suitable to be used in scenarios where the volume of data handled is too high to be managed in a single SP instance or a minimum high availability deployment.
Distributed Siddhi applications
A Siddhi Application is a combination of multiple Siddhi executional elements. A Siddhi executional element can be a Siddhi Query or a Siddhi Partition. In distributed processing perspective, a collection of these execution elements is called an execution group. Execution group is the smallest unit of execution.
Distributed processing of a Siddhi application allows users to execute multiple instances of each execution group in-parallel in multiple SP instances.
Users can specify execution groups and the parallelism to execute them by annotating existing Siddhi applications. Following sample application is annotated in that manner.
@App:name('wso2-app') @info(name = ‘query-1') @dist(execGroup='group-1') from TempStream#window.time(2 min) select avg(temp) as avgTemp, roomNo, deviceID insert all events into AvgTempStream; @info(name = ‘query-2') @dist(execGroup='group-1') from every( e1=TempStream ) -> e2=TempStream[e1.roomNo==roomNo and (e1.temp + 5) <= temp ] within 10 min select e1.roomNo, e1.temp as initialTemp, e2.temp as finalTemp insert into AlertStream; @info(name = ‘query-3') @dist(execGroup='group-2' ,parallel ='2') from TempStream [(roomNo >= 100 and roomNo < 110) and temp > 40 ] select roomNo, temp insert into HighTempStream;
This sample disributed Siddhi application contains two execution groups named group-1 and group-2 (defined via execGroup='<GROUP_ID>'
e.g., execGroup='group-1'
). group-1
contains two queries named query-1
and query-2
. group-2
contains query-3
. No specific number of parallel instances are specified for group-1
. Therefore, only one instance is created for it at runtime by default. Two parallel instances are specified for group-2
.
The following is an illustration of how each parallel instance is created as a separate Siddhi application.
Each Siddhi application is deployed in the available resource nodes of the distributed cluster. All these Siddhi applications communicate with each other via the messaging layer. The system has the ability to interact with the messaging layer and create topics representing each stream, and it configures the Siddhi applications to use these topics as required.
For detailed information, see Converting to a Distributed Streaming Application.
Deployment architecture
WSO2 Stream Processor has a component named Dashboard in the User Interface and Dashboard layer. The Dashboard allows users to view the output of analytics in an interactive manner. It also conveys observability information the cluster, the status of the list of the applications (i.e., Siddhi applications) currently submitted, and the status of each Stream Processor node. The JVM metrics, as well as Siddhi application level metrics, can be viewed through this dashboard.
Job Manager nodes handle all the Management layer related functionalities. This layer contains two WSO2 SP Manager instances configured to run in high availability mode. Here, the Manager parses the distributed Siddhi application provided by the user, partitions it into multiple Siddhi applications, wires them using messaging layer topics, and deploys them in the available worker nodes. Management layer also handles the effects of the worker nodes joining/leaving the distributed cluster by re-distributing the Siddhi applications accordingly.
The processing layer (also known as the resource cluster) is represented by multiple WSO2 SP Worker instances that are configured as workers. Each WSO2 SP worker instance in this layer registers itself to the Manager Cluster when it starts. These workers periodically send their heartbeats to the Manager Cluster. This allows the Managers to identify the active worker nodes and the inactive ones. The worker nodes (resource nodes) run the Siddhi applications assigned to them by their Manager nodes. In addition, they are also capable of handling network partitions in a graceful manner as depicted in the following diagram.
As depicted above, a worker node periodically synchronizes its configurations and the Siddhi applications with the manager Node. If the network gets partitioned or if the manager becomes unreachable, it undeploys the applications deployed in it. By doing so, it allows the Siddhi applications to be rescheduled in other work nodes that are maintaining their connections with the manager nodes.
It is required to use Apache Kafka or NATS as the messaging layer to configure a fully distributed SP cluster. Persistence stores of the Persistence layer can be RDBMS databases that store both configuration and system state data. Identity and access management of all the WSO2 Stream Processor nodes can be handled by any SCIM supported Identity provider such as the WSO2 Identity and Access Management(WSO2 IAM).
There are no restrictions to run WSO2 Stream Processor in the distributed mode on any environment. It can run in the distributed mode on bare metal, VMs, and containers. Here the manager nodes are grouped in a single cluster backed by a database for correlation. Similarly, dashboard nodes can also be deployed in a separate cluster. The worker nodes, on the other hand, are not aware of each other. They are synchronized with manager nodes from which they receive instructions.
Manager cluster
The manager cluster contains two or more WSO2 SP instances configured to run in the high availability mode. The manager cluster is responsible for parsing a user-defined distributed Siddhi application, dividing it to multiple Siddhi applications, creating the required topics and then deploying them in the available resource nodes. The manager cluster also handles resource nodes that join/leave the distributed cluster, and re-schedules the Siddhi applications accordingly. Since manager nodes are deployed in a high availability mode, if and when the active manager node goes down, another node in the manager will be elected as the cluster to handle the resource cluster.
Resource cluster
A resource cluster contains multiple WSO2 SP instances. Each instance sends a periodic heartbeat to the manager cluster so that the managers at any given time can identify the resource nodes that are active in the cluster. The resource nodes are responsible for running Siddhi applications assigned to them by the manager nodes. A resource node continues to run its Siddhi applications until a manager node undeploys them, or until it is no longer able to reach a manager node to send its heartbeat. If a manager node is unreachable for a specified amount of time, the resource node stops operating, removes its deployed Siddhi applications and waits until it can reach a manager node again.
The resource cluster can include both receiver workers and resource workers. You can specify the minimum number of receiver worker nodes to be included. However, you need to ensure that the minimum number specified is greater than one. This is because, if one or more distributed Siddhi applications contain a user-defined source such as HTTP or Thrift, then that Siddhi application cannot be deployed in a resource worker node. Therefore, at least one receiver worker node needs to be available in the resource cluster to ensure that distributed Siddhi applications are successfully deployed.
Deployed Siddhi applications communicate among themselves via the messaging layer.
Kafka cluster
It is required to insrtall Kafka and Zookeeper to configure a fully distributed deployment.
A Kafka cluster holds all the topics used by distributed Siddhi applications. All communications between execution groups take place via Kafka.
Publishing and receiving data from distributed Siddhi applications can be done via Kafka or other Siddhi sources as follows:
- Via Kafka
To use Kafka for publishing and receiving data, you can either define a Kafka source in the initial distributed Siddhi application or use the Kafka source created by the distributed implementation. Via Other Siddhi Sources
This invoves definingh the source in the initial distributed Siddhi application.At least one receiver worker node should be configured in the cluster.
Messaging Cluster
It is required to install either Kafka or NATS broker as the messaging layer to configure a fully distributed deployment.
Messaging cluster holds all the topics used by distributed Siddhi applications for communications between the nodes in it. Publishing and receiving data from distributed Siddhi applications can also be done via the same messaging cluster. To use the messaging layer as the event entry point, you need to define sources based on the type of the messaging layer. Then the generated partial Siddhi applications consume from those predefined topics. Topics to send data across the generated partial applications are automatically created.
Configuring a distributed cluster
This section explains how to configure a distributed WSO2 SP cluster.
Prerequisites
In order to configure a fully distributed HA cluster, the following prerequisites must be completed:
- A WSO2 SP binary pack must be available for each node in the cluster.
- Each SP node must have a distinct ID under wso2.carbon in the
<SP_HOME>/conf/manager/deployment.yaml
or<SP_HOME>/conf/worker/deployment.yaml
file depending on the cluster node being configured. A working RDBMS instance to be used for clustering of the manager nodes. Currently H2, MySQL, Oracle, Postgre and MSSQL databases are supported.
Add the database driver corresponding with the used DB system to the
<SP_HOME>/lib
directory.If the driver jar is not an OSGi bundle, you need to convert the JAR into a bundle using the
<SP_HOME>/bin/jartobundle.sh
script. For more information, see Configuring Datasources.- The messaging cluster based on Kafka or NATS must be started, and the host and ports of the cluster must be known. You also need a ZooKeeper cluster to facilitate the Kafka cluster. The following versions of each product is supported.
- Zookeeper version: 3.4.6
- Kafka version: 2.11-0.10.0.0
- NATS streaming server version: 0.11.x
- The following tasks need to be carried out depending on the messaging layer in order to make WSO2 SP compatible with the messaging layer.
Converting JARs to OSGi bundle
To convert jars to OSGi bundles, follow the steps below:
- Create the source directory (e.g., named
jars
) and copy the required JARs into the created directory. - Create another directory (e.g., named
osgi
). This is the destination directory to which the converted OSGi bundles are to be added. To convert the JARs, navigate to the
<SP_HOME>/bin
directory and issue one of the following commands.- For Linux:
./jartobundle.sh <path_to_source_directory> <path_to_destination_directory>
- For Windows:
./jartobundle.bat <path_to_source_directory> <path_to_destination_directory>
If the JARs are successfully converted, the following message appears in the terminal.
- INFO: Created the OSGi bundle <jar-name>.jar
for
JAR file <absolute_path>/jars/<jar-name>.jar
- For Linux:
The converted OSGi bundles are now available in the destination directory. Copy them and place them in the
<SP_HOME>/lib
directory.
Configuring the cluster
To configure a fully distributed HA cluster, follow the procedure below:
Configure manager nodes
To configure a node as a manager node, update the <SP_HOME>/conf/manager/deployment.yaml
file as follows. The fully distributed cluster can have one or more manager nodes. For more information on how to set up cluster coordination see Configuring Cluster Coordination.
- In the
cluster.config
section, make the following changes.- To enable the cluster mode, set the
enabled
property totrue
. - In order to cluster all the manager nodes together, enter the same cluster ID as the group ID for all the nodes (e.g.,
groupId: group-1
). - Enter the ID of the class that defines the coordination strategy for the cluster as shown in the example below.
e.g.,coordinationStrategyClass: org.wso2.carbon.cluster.coordinator.rdbms.RDBMSCoordinationStrategy
- To enable the cluster mode, set the
- In the
strategyConfig
section ofcluster.config
, enter information for the required parameters as follows.Enter the ID of the datasource shared by the nodes in the cluster as shown in the example below. Data handled by the cluster are persisted here.
datasource: SP_MGT_DB
The SP_MGT_DB datasource is configured to an h2 database by default. You must create a MySQL database and then configure this datasource in the <
SP_HOME>/conf/manager/deployment.yaml
file of the required manager. The following is a sample configuration.- name: SP_MGT_DB description: The MySQL datasource used for Cluster Coordination # JNDI mapping of a data source jndiConfig: name: jdbc/WSO2ClusterDB # data source definition definition: # data source type type: RDBMS # data source configuration configuration: jdbcUrl: 'jdbc:mysql://<host>:<port>/<database_name>?useSSL=false' username: <Username_Here> password: '<Password_Here>' driverClassName: com.mysql.jdbc.Driver maxPoolSize: 50 idleTimeout: 60000 connectionTestQuery: SELECT 1 validationTimeout: 30000 isAutoCommit: false
- Specify the time interval (in milliseconds) at which heartbeat pulse should occur within the manager cluster to indicate that a manager is in an active state as shown in the example below.
heartbeatInterval: 500
- Specify the number of times the heartbeat pulse must be unavailable at the specified time interval in order to consider a manager node as inactive as shown in the example below. A value of four means that if a manager node fails to send four consecutive heart beat pulses, it will be identified as unresponsive and another manager node will act as the active node in the manager cluster.
heartbeatMaxRetry: 4 - Specify the time interval (in milliseconds) at which each node should listen for changes that occur in the cluster as shown in the example below.
eventPollingInterval: 1000
- In the
deployment.config
section, enter information as follows:- In the
type
field, enter the type of the cluster asdistributed
.
type: distributed
For the
httpsInterface
parameter, specify the host and the port of the node.Host should be the IP of the network interface though which nodes are connected. (i.e LAN IP). Each node should have a separate port if deployed in same physical machine.
e.g.,
host:localhost, port:9543
- Specify the time interval (in milliseconds) at which resource nodes connected to this manager should send heartbeat pulses to indicate that they are in a working state as shown in the example below.
e.g., heartbeatInterval: 2000 Specify the number of times a resource node's heartbeat should be unavailable for the manager node to identify that the resource node as unresponsive. i.e. according to the below example, if the resource node fails to send 4 consecutive heartbeat pulses it will be recognized as unresponsive and the siddhi applications deployed in that node will be rescheduled to another available resource node.
e.g.,heartbeatMaxRetry: 4
In the
minResourceCount
parameter, specify the minimum number of resource nodes required to operate the distributed setup. Siddhi applications are not deployed when the number of available resource nodes is less than the number specified here. The default value is1
.- If you are using NATS as messaging layer, specify the NATS server URLs used by the cluster. You can enter multiple URLs in the
natsServerUrl
parameter as a comma-separated list
(e.g.,<host_1>:<port_1>, <host_2>:<port_2>
). You also need to provide the ID of the cluster created in the NATS server via theclusterid
parameter. I f you are using Kafka as messaging layer, specify the Kafka server URLs used by the cluster via the
bootstrapURLs
parameter as a comma-separated list ( e.g.,<host_1>:<port_1>, <host_2>:<port_2>
).For the
zooKeeperURLs
parameter underzookeeper config
, specify the server URL of the zookeeper of the cluster in the format given below.
<ZOOKEEPER_HOST>:<ZOOKEEPER_PORT>
The following is an example of a
deployment.config
section configured as described above.
- In the
Configure resource nodes
To configure the resource nodes for a fully distributed HA cluster, edit the <SP_HOME>/conf/worker/deployment.yaml
file as follows. You have to uncomment (remove the # in front of each line) the section under # Sample of deployment.config for Distributed deployment. Now start performing following steps under deployment.config section.
- Uncomment
deployment.config
section of thedeployment.yaml
file. Then configure the resource nodes to communicate with the manager node by following the steps below.In the
type: distributedtype
field, enter the type of the cluster asdistributed
.
For the
httpsInterface
parameter, specify the host, port and the user credentials of the configuring resource node.The host must be the IP of the network interface through which the nodes are connected (i.e., LAN IP). If all the nodes are deployed in the same physical machine, each node must have a separate port.
e.g.,
host:localhost, port:9090, username:admin, password:admin
- In the
leaderRetryInterval
parameter, enter the number of milliseconds for which the resource node must keep retrying to connect with a manager node. If the time specified for this parameter elapses without the resource node connecting to a manager node, the resource node is shut down.
e.g.,leaderRetryInterval: 5000
In the
resourceManagers
parameter, specify the hosts, ports and user credentials of the manager nodes to which the resource node must try to connect. If there are multiple managers, a sequence must be specified.
Following is a sample deployment configuration for a resource node.deployment.config: type: distributed # required in both manager / resource httpsInterface: # required in both manager / resource host: 192.168.1.3 port: 9090 username: admin # username of current resource node password: admin # password of current resource node leaderRetryInterval: 10000 # only required in worker resourceManagers: # only required in worker - host: 192.168.1.1 port: 9543 username: admin # username of manager node password: admin # password of manager node - host: 192.168.1.2 port: 9543 username: admin # username of manager node password: admin # password of manager node
If you want to configure this noded as a receiver node instead of a resource node, you need to add the following parameter below the
type
parameter.isReceiverNode : true
This is the only difference between a resource node and a receiver node. The following is the sample configuration of a receiver node.
deployment.config: type: distributed isReceiverNode : true httpsInterface: host: 192.168.1.3 port: 9090 username: admin password: admin leaderRetryInterval: 10000 resourceManagers: - host: 192.168.1.1 port: 9543 username: admin # username of manager node password: admin # password of manager node - host: 192.168.1.2 port: 9543 username: admin # username of manager node password: admin # password of manager node
In order to retrieve the state of the Siddhi Applications deployed in the system in the event of a node failure, state persistence must be enabled for all worker nodes. This is done via database state persistance, sharing a common database. To do that first you need to define a new datasource to be used by persistance information under the
datasources
section of thedeployment.yaml
file. The following is a sample of such datasource configuration based on a MySQL database.- name: WSO2_PERSISTENCE_DB description: The datasource used for test database jndiConfig: name: jdbc/WSO2_PERSISTENCE_DB definition: type: RDBMS configuration: jdbcUrl: jdbc:mysql://localhost:3306/WSO2_PERSISTENCE_DB?useSSL=false username: root password: root driverClassName: com.mysql.jdbc.Driver maxPoolSize: 50 idleTimeout: 60000 connectionTestQuery: SELECT 1 validationTimeout: 30000 isAutoCommit: false
After this, you need to enable state persistance pointing to this database via the
org.wso2.carbon.stream.processor.core.persistence.DBPersistenceStore
persistence store. The following is a sample configuration. For detailed instructions, see Configuring Database and File System State Persistence.state.persistence: enabled: true intervalInMin: 3 revisionsToKeep: 2 persistenceStore: org.wso2.carbon.stream.processor.core.persistence.DBPersistenceStore config: datasource: WSO2_PERSISTENCE_DB table: PERSISTENCE_TABLE
Starting the cluster
To start the distributed SP cluster, follow the procedure below:
- Start each manager by navigating to the
<SP_HOME>/bin
directory and issuing the following command:
For Windows:manager.bat
For Linux :./manager.sh
- Start each worker by navigating to the
<SP_HOME>/bin
directory and issuing the following command:
For Windows:worker.bat
For Linux :./worker.sh
When both manager and resource nodes are successfully started, the following is printed in the log entry.
INFO {org.wso2.carbon.kernel.internal.CarbonStartupHandler} - WSO2 Stream Processor started in x sec
Siddhi applications should be deployed to the manager cluster using one of the following methods.
Dropping the
.siddhi
file in to the<SP_HOME>/wso2/manager/deployment/siddhi-files/
directory before or after starting the manager node.Sending a "POST" request to
http://<host>:<port>/siddhi-apps,
with the Siddhi App attached as a file in the request as shown in the example below. Refer Stream Processor REST API Guide for more information on using WSO2 Strean Processor APIs.Sample CURL request to deploy Siddhi applicationcurl -X POST "https://localhost:9543/siddhi-apps" -H "accept: application/json" -H "Content-Type: text/plain" -d @TestSiddhiApp.siddhi -u admin:admin -k
Important
To deploy Siddhi applications in Distributed deployment it is recommended to use a content synchronization mechanism since Siddhi applications must be deployed to both manager nodes. You can use a common shared file system such as Network File System (NFS) or any other shared file system that is available. You need to mount the<SP_HOME>/wso2/manager/deployment/siddhi-files/
directory of the two nodes to the shared file system.
- Start each manager by navigating to the