Configuring an Active-Active Multi Datacenter Deployment
In this deployment, both data centers function as active nodes. Therefore, both data centers process and publish events simultaneously.
Configuring the multi DC setup
To configure a multi DC set-up, follow the steps below:
- Set up two working SP clusters. Each cluster can be either a minimum HA cluster or a fully distributed cluster.
- Download and install a message broker to achieve guarenteed delivery and other QoS (Quality of Service) characteristics such as processing each event exactly once, ordering of events, etc. Deploy two instances of this server in each datacenter. When sending messages to this WSO2 SP setup, the publisher should send events to both brokers so that both datacenters build their state simultaneously. This ensures that if one datacenter fails, the messages lost as a result can be fetched from the broker of the other datacenter. In this example, a Kafka message broker is set up. For detailed instructions to set up a Kafka message broker, see Kafka Documentation.
- To create a Siddhi application that sends messages to both the clusters, follow the steps below.
Add a name for the Siddhi application as shown in the example below.
@App:name('SweetTotalApp')
Let's consider a simple scenario where information relating to the production in a factory is recorded. The sum of the production amount for each product is recorded and published as the output. For this, add the definitions for the required input and output streams, and the query as shown below.
@App:
name
(
'SweetTotalApp'
)
define stream SweetProductionStream (
name
string, amount long);
define stream SweetTotalStream(
name
string, totalProduction long);
@info(
name
=
'SweetTotalQuery'
)
from
SweetProductionStream
select
name
,
sum
(amount)
as
totalProduction
group
by
name
insert
into
SweetTotalStream;
To receive the events to be processed, add a
KafkaMultiDC
event source to the input stream definition (in this example,SweetProductionStream
) as shown below.@App:name('SweetTotalApp')
@source(type=kafkaMultiDC, bootstrap.servers='host1:9092, host2:9092')
define stream SweetProductionStream (name string, amount long);
define stream SweetTotalStream(name string, totalProduction long);
@info(name='SweetTotalQuery')
from SweetProductionStream
select name, sum(amount) as totalProduction
group by name
insert into SweetTotalStream;
In this multi datacenter scenario, the input events need to be received by two clusters. This is facilitated via the
kafkaMultiDC
source type. Once you select this source type, you can specify the two hosts to which the input events need to be sent via thebootstrap.servers
parameter. For more information about this source type, see Siddhi Extensions Documentation - kafkaMultiDC Source.To publish the output, connect a sink of the required type. In this example, let's assume that they are published as logs.
@App:name('SweetTotalApp')
@source(type=kafkaMultiDC, bootstrap.servers='host1:9092, host2:9092')
define stream SweetProductionStream (name string, amount long);
@sink(type=
'log'
, prefix=
'Sweet Totals:'
)
define stream SweetTotalStream(name string, totalProduction long);
@info(name='SweetTotalQuery')
from SweetProductionStream
select name, sum(amount) as totalProduction
group by name
insert into SweetTotalStream;
Save the changes. The completed Siddhi application is as follows.
@App:name('SweetTotalApp') @source(type=kafkaMultiDC, bootstrap.servers='host1:9092, host2:9092') define stream SweetProductionStream (name string, amount long); @sink(type='log', prefix='Sweet Totals:') define stream SweetTotalStream(name string, totalProduction long); @info(name='SweetTotalQuery') from SweetProductionStream select name, sum(amount) as totalProduction group by name insert into SweetTotalStream;
- Deploy this Siddhi application in the production environment for each datacenter as follows.
- Place the Siddhi application in the
<SP_HOME>/wso2/worker/deployment/siddhi-files
directory of all the required nodes. - Start all the worker nodes of both the clusters. For more information, see Deploying Streaming Applications.
- Place the Siddhi application in the
- If a failure occurs, the datacenter that is still active must be able to start publishing messages from where the active datacenter stopped before failing. To allow this, open the
<SP_HOME>/conf/worker/deployment.yaml
file, and under thestate.persistence
section, set theenabled
parameter totrue
. Then specify the required information about the data store in which the messages are persisted. For detailed instructions, see Configuring Database and File System State Persistence. - If you are including aggregation queries in your Siddhi applications, they need to be partitioned because the processing is carried out by both data centers. To do this, follow the steps below.
In this scenario, assume that the sweet totals need to be calculated per hour, day, month, and year. To do this, let's define an aggregation as follows.
@App:name('SweetTotalApp') @source(type=kafkaMultiDC, bootstrap.servers='host1:9092, host2:9092') define stream SweetProductionStream (name string, amount long); @sink(type='log', prefix='Sweet Totals:') define stream SweetTotalStream(name string, totalProduction long);
define aggregation SweetProductionAggregationfrom SweetProductionStream
select name, sum(amount) as totalAmount
group by name
aggregate every hour...year @info(name='SweetTotalQuery') from SweetProductionStream select name, sum(amount) as totalProduction group by name insert into SweetTotalStream;
For more information about aggregations, see Incremental Analysis.
To store the time-based aggregations calculated via the aggregation you added above, define a data store as follows.
Both data centers need to be connected to the same database.
@store(type=
'rdbms'
, jdbc.url=
"jdbc:mysql://localhost:3306/TestDB"
, username=
"root"
,
password
=
"root"
, jdbc.driver.
name
=
"com.mysql.jdbc.Driver"
)
define aggregation SweetProductionAggregationfrom SweetProductionStream
select name, sum(amount) as totalAmount
group by name
aggregate every hour...year
To allow the aggregation to be partitioned, set the
@PartitionById(enable=
'true'
)
property as follows.@store(type=
'rdbms'
, jdbc.url=
"jdbc:mysql://localhost:3306/TestDB"
, username=
"root"
,
password
=
"root"
, jdbc.driver.
name
=
"com.mysql.jdbc.Driver"
)
@PartitionById(enable=
'true'
)
define aggregation SweetProductionAggregationfrom SweetProductionStream
select name, sum(amount) as totalAmount
group by name
aggregate every hour...year
In order top make it possible for the aggregations to be partitioned by the node ID, a unique ID needs to be defined for each node in each cluster of the multi data senter set up. This ID is defined in the
<SP_HOME>/conf/<PROFILE/deployment.yaml
file. For more information, See Incremental Analysis - Scaling through distributed aggregations.
The input information to be processed needs to be received by both the clusters. To do enable this, create another Siddhi application that functions in the same manner as an external client by following the steps below.
This step is optional. You can alternatively configure your external client to publish events to both the datacenters.
Add a name for the Siddhi application as shown in the example below.
@App:name('ProductionTotalReportingApp')
To capture the input information (in this example, the name of the item produced and the quantity produced), add an input stream definition as shown below.
@App:name('ProductionTotalReportingApp')
define stream GetProductionAmountStream(name string, totalProduction long);
Let's assume that input data is received to this stream via HTTP and connect a source to this stream as follows.
@App:name('ProductionTotalReportingApp')
@source(type=
'http'
, receiver.url=
'http://localhost:5005/SweetProductionEP'
, @map(type =
'json'
))
define stream GetProductionAmountStream(name string, totalProduction long);
This Siddhi application needs to publish the information it receives without any further processing. Therefore, add an output stream definition with the same schema as shown below.
@App:name('ProductionTotalReportingApp')
@source(type=
'http'
, receiver.url=
'http://localhost:5005/SweetProductionEP'
, @map(type =
'json'
))
define stream GetProductionAmountStream(name string, totalProduction long);
define stream SweetProductionStream(name string, totalProduction long);
Now add a Siddhi query as follows to direct the data from the input stream to the output stream as follows.
@App:name('ProductionTotalReportingApp')
@source(type=
'http'
, receiver.url=
'http://localhost:5005/SweetProductionEP'
, @map(type =
'json'
)
define stream GetProductionAmountStream(name string, totalProduction long);
@sink(type='kafkaMultiDC', topic='myTopic', bootstrap.servers='host1:9092, host2:9092', sequence.id=’sink1’, @map(type='xml'))
define stream SweetTotalStream(name string, totalProduction long);
from GetProductionAmountStream
select name, amount insert into SweetProductionStream;
- The information needs to be published to Kafka topics to which the nodes of the two datacenters have subscribed. To do this, connect a sink of the
kafkaMultiDC
type to the output stream (in this example,SweetProductionStream
) as shown below.In this configuration,
kafkaMultiDC
is selected as the sink type because it supports the following:- As mentioned before, the multi DC setup uses a message broker to achieve guarenteed delivery and other QoS (Quality of Service) characteristics. The sink of the
kafkaMultiDC
type allows you to refer to the topic of the message broker you configured via thetopic
parameter. - The servers to which the events are published are specified as a comma-separated list via the
bootstrap.servers
parameter - A unique sequence ID can be assigned to each event via the
sequence.id
parameter. These are assigned for the following purposes.- No duplicated events must be created in the recovering datacenter when it fetches the lost events from the broker in the other datacenter. Assigning a unique sequence ID to each event allows the recovering node to identify duplicated events, and ensure that each event is processed only once to avoid an unnecessary system overhead.
- When one datacenter fails, the other datacenter must be able to identify the messages that are not already published,. This can be achieved by assigning each event a unique sequence ID.
For more information about this source type, see Siddhi Extensions Documentation - kafkaMultiDC Sink.
The completed Siddhi application is as follows.@App:name('ProductionTotalReportingApp') @source(type='http', receiver.url='http://localhost:5005/SweetProductionEP', @map(type = 'json')) define stream GetProductionAmountStream(name string, totalProduction long); @sink(type='kafkaMultiDC', topic='myTopic', bootstrap.servers='host1:9092, host2:9092', sequence.id=’sink1’, @map(type='xml')) define stream SweetTotalStream(name string, totalProduction long); from GetProductionAmountStream select name, amount insert into SweetProductionStream;
- As mentioned before, the multi DC setup uses a message broker to achieve guarenteed delivery and other QoS (Quality of Service) characteristics. The sink of the
Save this Siddhi Application. Then deploy it in a separate SP pack by placing it in the
<SP_HOME>/wso2/worker/deployment/siddhi-files
directory.This SP pack should be a separate pack that can be run externally to the multi DC cluster.
Run the SP instance in which you deployed the Siddhi application by issuing one of the following commands from the
<SP_HOME>/bin
directory.- For Windows:
editor.bat
- For Linux :
./editor.sh
- For Windows:
To process information via this setup, collect the required events via the
ProductionTotalReportingApp
Siddhi application. For more information, see Collecting Events.