This documentation is for WSO2 Complex Event Processor 2.0.1. View documentation for the latest release.

Long Running Purchase Analyzer

This sample demonstrates how Siddhi engine can be used in the persistence mode for long running queries that spans much higher then server up time.

In this sample CEP will receive phone retail store information and publish retail summery on RetailSummary topic.

from phoneRetailStream#window.time(1200000)
insert into retailSummaryStream
count(brand) as purchaseOrders ,sum(quantity) as quantitySold, sum(totalPrice) as revenueEarned, avg(totalPrice) as avgRevenue 

In this sample we will publish events using a custom Data-Bridge data publisher and the output events from the bucket will be published to a JMS subscriber to receive and log those events.

Following is the configuration used in this sample.

<bucket name="LongRunningPurchaseAnalyzer" xmlns="http://wso2.org/carbon/cep">
    <description>
Calculate the last 20 minutes total purchases summary.
    </description>
    <engineProviderConfiguration engineProvider="SiddhiCEPRuntime">
        <property name="siddhi.persistence.snapshot.time.interval.minutes">1</property>
        <property name="siddhi.enable.distributed.processing">false</property>
    </engineProviderConfiguration>
    <input topic="org.wso2.phone.retail.store/1.2.0" brokerName="localAgentBroker">
        <tupleMapping stream="phoneRetailStream"  <!--queryEventType="Tuple"--> >
            <property name="brand" inputName="brand" inputDataType="payloadData"
                      type="java.lang.String"/>
            <property name="quantity" inputName="quantity" inputDataType="payloadData"
                      type="java.lang.Integer"/>
            <property name="totalPrice" inputName="total" inputDataType="payloadData"
                      type="java.lang.Integer"/>
            <property name="buyer" inputName="buyer" inputDataType="payloadData"
                      type="java.lang.String"/>
        </tupleMapping>
    </input>
    <query name="AllStocksQuery">
        <expression >
from phoneRetailStream#window.time(1200000)
insert into retailSummaryStream
count(brand) as purchaseOrders ,sum(quantity) as quantitySold, sum(totalPrice) as revenueEarned, avg(totalPrice) as avgRevenue
        </expression>
        <output topic="RetailSummary" brokerName="activemqJmsBroker">
            <xmlMapping>
                <quotedata:RetailSummaryEvent
                        xmlns:quotedata="http://ws.cdyne.com/">
                    <quotedata:PurchaseOrders>{purchaseOrders}</quotedata:PurchaseOrders>
                    <quotedata:QuantitySold>{quantitySold}</quotedata:QuantitySold>
                    <quotedata:RevenueEarned>{revenueEarned}</quotedata:RevenueEarned>
                    <quotedata:AvgRevenuePerPurchase>{avgRevenue}</quotedata:AvgRevenuePerPurchase>
                </quotedata:RetailSummaryEvent>
            </xmlMapping>
        </output>
    </query>
</bucket>

Prerequisites

  • Apache Ant to build & deploy the Sample & Service, and to run the client. Refer Installation Prerequisites for instructions to install Apache Ant.
  • ActiveMQ JMS Broker to subscribe to the output events. Refer Installation Prerequisites for instructions to install ActiveMQ JMS Broker.

Deploying the configurations

The steps are as follows :

  1. Install the WSO2 Complex Event Processor, but do not start the server Refer to the Installation and Deployment for instructions.
  2. Copy paste activemq-all-xxx.jar from the <ActiveMQ_HOME> directory to <CEP_HOME>/samples/lib directory. 

  3. Copy paste activemq-core-xxx.jar and geronimo-j2ee-management_1.1_spec-1.0.1.jar from <ActiveMQ_HOME>/lib to <CEP_HOME>/repository/components/lib directory.

  4. In a command prompt, switch to the sample directory: <CEP_HOME>/samples/cep-samples 
    For example, in Linux: cd <CEP_HOME>/samples/cep-samples
  5. From there, type ant deploy-persistence,
    This will copy the broker-manager-config.xml to <CEP_HOME>/repository/conf directory and the bucket configuration to <CEP_HOME>/repository/deployment/server/cepbuckets directory.
  6. Start ActiveMQ JMS Broker. Refer Installation Prerequisites for instructions to run ActiveMQ JMS Broker. 

Starting JMS subscriber

The steps are as follows :  

  1. In a new command prompt, switch to the sample directory: <CEP_HOME>/samples/cep-samples 
    For example, in Linux: cd <CEP_HOME>/samples/cep-samples
  2. From there, type ant jmsSubscriber -Dtopic=RetailSummary, this will subscribe to the RetailSummary topic of the ActiveMQ Broker receiving the output events of CEP.

Running the Client

The steps are as follows :

  1. In a command prompt, switch to the CEP samples directory:<CEP_HOME>/samples/cep-samples 
    For example, in Linux: cd <CEP_HOME>/samples/cep-samples
  2. From there, type ant agentPhoneRetailClient -Devents=5
    This will send 5 thrift events to CEP.

Observation

You will be able observe the output events in the JMS subscriber console as shown bellow.

Note

Observe how the PurchaseOrders, QuantitySold and the RevenueEarned fields increases according to the input events. Note, sometimes some of the above entries might not get printed, due to batch processing, but you will always get the most updated value.

Gracefully restarting CEP server

The steps are as follows :

  1. Sign In. Enter your user name and password to log on to the Complex Event Processor Management Console.



  2. Click on "Shutdown/Restart Server" menu item under Manage section of the left panel
    This will redirect to the Shutdown/Restart Server page.



  3. Click on "Graceful Restart" and click "Yes" to conform Server restart when prompted.



  4. Observer the server logs to check whether server has properly restarted.

Rerunning the Client

The steps are as follows :

  1. In a command prompt, switch to the CEP samples directory:<CEP_HOME>/samples/cep-samples 
    For example, in Linux: cd <CEP_HOME>/samples/cep-samples
  2. From there, type ant agentPhoneRetailClient -Devents=1
    This will send 1 thrift events to CEP.

Observation

You will be able observe the output events in the JMS subscriber console as shown bellow. Observe how the PurchaseOrders, QuantitySold and the RevenueEarned fields has incensed according to the input event.

 

 

Copyright © WSO2 Inc. 2005-2014