Creating a Storm Based Distributed Execution Plan
When WSO2 CEP is run is a distributed set up with Apache Storm, Siddhi queries are executed inside Siddhi engines embedded in the bolts of the Apache Storm topology (for detailed information about Apache Storm topology, see Apache Storm Tutorial).
Creating a distributed execution plan
Execution plans in a CEP distributed set up should be created in the Management Console of a CEP manager. The Management consoles of CEP workers cannot be used for this purpose.
The procedure for creating a Storm based distributed execution plan is the same as creating a stand alone execution plan.
Supported annotations
The annotations supported for Apache Storm distributed execution plans are as follows.
Annotation | Description | Example |
---|---|---|
@dist (parallel='<number of Storm tasks>) | The number of storm tasks in which the query should be run parallel. | @dist(parallel='4') |
@dist(execGroup='name of the group')
| All the Siddhi queries in a particular execGroup will be executed in a single Siddhi bolt. |
@dist(execGroup='Filtering')
|
@Plan:dist(receiverParallelism='number of receiver spouts')
| The number of event receiver spouts to be spawned for the Storm topology. | @Plan:dist(receiverParallelism='1') |
@Plan:dist(publisherParallelism='number of publisher bolts')
| The number of event publisher bolts to be spawned for the Storm topology. | @Plan:dist(publisherParallelism='4')
|
The following is an example of an execution plan populated with the annotations mentioned above.
Once an execution plan is created as saved, it's configuration in WSO2 Siddhi Query Language format is saved in the <CEP_HOME>/repository/deployment/server/executionplans
directory.
Every Siddhi query in a particular
execGroup
should have the same number of tasks as shown in the execution plan above ( e.g., parallel = '4'
). If the queries need to be distributed across different siddhi bolts, the execGroup
names of the queries should differ from each other.Generating the Apache Storm topology
Once an execution plan is created and submitted, it is converted to a query plan which defines the components and layout of the storm topology generated for the execution plan in XML. The query plan contains configurations such as input/output streams, number of instances to be spawned which siddhi queries to be executed inside(for bolts), etc.
If you want to view the query plan, add log4j.logger.org.wso2.carbon.event.processor.core.internal.storm.StormTopologyManager=DEBUG
in the <CEP_HOME>/repository/conf/log4j.properties
file.
Once the storm topology is generated based on the query plan, it is submitted to Apache Storm together with the org.wso2.cep.storm.dependencies.jar
file. This jar file contains the classes required to run the topology in the Storm cluster. It consists of dependencies such as Siddhi, etc. The Nimbus (i.e. the manager of storm cluster) distributes it across the storm cluster. This jar file is located in the <CEP_HOME>/repository/conf/cep/storm
directory.
Once the topology is successfully submitted to Apache Storm, a log similar to the following example is printed in the wso2carbon.log file of the CEP manager.
Naming execution plans
A name can be specified for each execution plan with the @Plan:name
annotation. The CEP manager creates the Storm topology with the name specified in this annotation. In addition, the tenant ID is appended to the name of the topology in order to avoid name collisions of storm topologies belonging to different tenants in a multi-tenanted environment.
For example, if an execution plan is created with the @Plan:name(‘StockAnalysis’)
annotation for the super-tenant with tenant ID 1234
, the Storm topology can be viewed as follows in the Storm UI.
Checking the status of the execution plan
Before publishing data to an event flow with a distributed execution plan, check whether the following conditions are met to ensure that the execution plan is ready to process data.
- The associated storm topology is in the
ACTIVE
state in the storm cluster. - All connections between the CEP workers and the event receiver spouts (inflow connections) are made.
- All connections between the event publisher bolts and CEP workers (outflow connections) are made.
To check whether the above conditions are met, click Main => Execution Plans in the Management Console of the relevant CEP manager. The status of the execution plan is displayed in the Distributed Deployment Status column as shown in the example below.
Things to note
- If you change the name of an execution plan after creating and saving it, remove the Storm topology generated with the previous name from the Apache Storm UI.
- The number of execution plans that are allowed to be created in a CEP distributed deployment is determined by the number of slots (i.e. workers) in the Storm cluster. Each execution plan by default requires at least one slot to deploy its Storm topology. This value can be changed by adding the following configuration to the <CEP_HOME>repository/conf/cep/storm/storm.yaml file.
topology.workers : <number>
For example, if you add the configurationtopology.workers : 2
, and the number of slots in the cluster is 10, then the maximum number of execution plans allowed to be created for the cluster is 5 (10/2). - At present, there is no way to control how bolts/spouts are spread across the Storm cluster. It is carried out in the Round Robin method by the default Storm Scheduler.
See the following samples for more information.