This documentation is for WSO2 CEP 2.1.0. View the home page of the latest release.

Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Current »

This sample demonstrates how Siddhi engine can be used in the distributed mode.

In this sample CEP will receive phone retail store information in several CEP nodes and publish retail summery on RetailSummary topic.

from phoneRetailStream#window.time(1200000)
insert into retailSummaryStream
count(brand) as purchaseOrders ,sum(quantity) as quantitySold, sum(totalPrice) as revenueEarned, avg(totalPrice) as avgRevenue 

In this sample we will publish events using a custom Data-Bridge data publisher and the output events from the bucket will be published to a JMS subscriber to receive and log those events.

Following is the configuration used in this sample.

<bucket name="DistributedPurchaseAnalyzer" xmlns="http://wso2.org/carbon/cep">
    <description>
Calculate the last 20 minutes total purchases summary.
    </description>
    <engineProviderConfiguration engineProvider="SiddhiCEPRuntime">
        <property name="siddhi.persistence.snapshot.time.interval.minutes">0</property>
        <property name="siddhi.enable.distributed.processing">true</property>
    </engineProviderConfiguration>
    <input topic="org.wso2.phone.retail.store/1.2.0" brokerName="localAgentBroker">
        <tupleMapping stream="phoneRetailStream"  <!--queryEventType="Tuple"--> >
            <property name="brand" inputName="brand" inputDataType="payloadData"
                      type="java.lang.String"/>
            <property name="quantity" inputName="quantity" inputDataType="payloadData"
                      type="java.lang.Integer"/>
            <property name="totalPrice" inputName="total" inputDataType="payloadData"
                      type="java.lang.Integer"/>
            <property name="buyer" inputName="buyer" inputDataType="payloadData"
                      type="java.lang.String"/>
        </tupleMapping>
    </input>
    <query name="AllStocksQuery">
        <expression >
from phoneRetailStream#window.time(1200000)
insert into retailSummaryStream
count(brand) as purchaseOrders ,sum(quantity) as quantitySold, sum(totalPrice) as revenueEarned, avg(totalPrice) as avgRevenue
        </expression>
        <output topic="RetailSummary" brokerName="activemqJmsBroker">
            <xmlMapping>
                <quotedata:RetailSummaryEvent
                        xmlns:quotedata="http://ws.cdyne.com/">
                    <quotedata:PurchaseOrders>{purchaseOrders}</quotedata:PurchaseOrders>
                    <quotedata:QuantitySold>{quantitySold}</quotedata:QuantitySold>
                    <quotedata:RevenueEarned>{revenueEarned}</quotedata:RevenueEarned>
                    <quotedata:AvgRevenuePerPurchase>{avgRevenue}</quotedata:AvgRevenuePerPurchase>
                </quotedata:RetailSummaryEvent>
            </xmlMapping>
        </output>
    </query>
</bucket>

Prerequisites

  • Apache Ant to build & deploy the Sample & Service, and to run the client. Refer Installation Prerequisites for instructions to install Apache Ant.
  • ActiveMQ JMS Broker to subscribe to the output events. Refer Installation Prerequisites for instructions to install ActiveMQ JMS Broker.

Deploying the configurations on 1st CEP node

We will also used this node to run the sample clients

The steps are as follows :

  1. Install the WSO2 Complex Event Processor, but do not start the server Refer to the Installation and Deployment for instructions.
  2. Copy paste activemq-all-xxx.jar from the <ActiveMQ_HOME> directory to <CEP_HOME>/samples/lib directory.  (to run sample clients)

  3. Copy paste activemq-core-xxx.jar and geronimo-j2ee-management_1.1_spec-1.0.1.jar from <ActiveMQ_HOME>/lib to <CEP_HOME>/repository/components/lib directory.

  4. In a command prompt, switch to the sample directory: <CEP_HOME>/samples/cep-samples 
    For example, in Linux: cd <CEP_HOME>/samples/cep-samples
  5. From there, type ant deploy-distributed,
    This will copy the broker-manager-config.xml to <CEP_HOME>/repository/conf directory and the bucket configuration to <CEP_HOME>/repository/deployment/server/cepbuckets directory.

Deploying the configurations on 2nd CEP node

The steps are as follows :

  1. Install the WSO2 Complex Event Processor, but do not start the server Refer to the Installation and Deployment for instructions.
  2. Copy paste activemq-core-xxx.jar and geronimo-j2ee-management_1.1_spec-1.0.1.jar from <ActiveMQ_HOME>/lib to <CEP_HOME>/repository/components/lib directory.

  3. In a command prompt, switch to the sample directory: <CEP_HOME>/samples/cep-samples 
    For example, in Linux: cd <CEP_HOME>/samples/cep-samples
  4. From there, type ant deploy-distributed,
    This will copy the broker-manager-config.xml to <CEP_HOME>/repository/conf directory and the bucket configuration to <CEP_HOME>/repository/deployment/server/cepbuckets directory.

Starting JMS subscriber

The steps are as follows :  

  1. Start ActiveMQ JMS Broker. Refer Installation Prerequisites for instructions to run ActiveMQ JMS Broker.
  2. In a new command prompt, switch to the sample directory of the 1st CEP node: <CEP_HOME>/samples/cep-samples 
    For example, in Linux: cd <CEP_HOME>/samples/cep-samples
  3. From there, type ant jmsSubscriber -Dtopic=RetailSummary, this will subscribe to the RetailSummary topic of the ActiveMQ Broker receiving the output events of CEP.

Publishing events to 1st CEP node

The steps are as follows :

  1. In a command prompt, switch to the CEP samples directory of the 1st CEP node: <CEP_HOME>/samples/cep-samples 
    For example, in Linux: cd <CEP_HOME>/samples/cep-samples
  2. From there, type ant agentPhoneRetailClient -Dport=7611 -Devents=2
    This will send 2 thrift events to CEP.

Observation

You will be able observe the output events in the JMS subscriber console as shown bellow.

Note

Observe how the PurchaseOrders, QuantitySold and the RevenueEarned fields increases according to the input events.

Publishing events to 2nd CEP node

The steps are as follows :

  1. In a command prompt, switch to the CEP samples directory of the 2nd CEP node:: <CEP_HOME>/samples/cep-samples 
    For example, in Linux: cd <CEP_HOME>/samples/cep-samples
  2. From there, type ant agentPhoneRetailClient -Dport=7612 -Devents=2
    This will send 2 thrift events to CEP.

Observation

You will be able observe the output events sent by the 2nd CEP node in the JMS subscriber console as shown bellow. Observe how the PurchaseOrders, QuantitySold and the RevenueEarned fields has incensed according to the input event.

Observe how the PurchaseOrders, QuantitySold and the RevenueEarned fields increases according to the input events.

  • No labels