This documentation is for WSO2 CEP 2.1.0. View the home page of the latest release.

Stock Price Analyzer

Stock Quote sample is based on Stock Quotes of any Stock Exchange. In this sample WSO2 CEP will receive some stock quote information and it will give an output if price of the stock is greater than a predefined value.

Create Local Broker

Before creating the bucket to filter stock quotes it is essential to have a broker adopter. Since In this example we are going to use local Broker, it is needed to create a broker with type local.To do that

  1. Start CEP Server and login as admin. (Follow the installation guide for more information on product startup)
  2. In the Configure menu you can find a Menu item called "Broker" and under that click on 'Add' sub menu item.

  3. You will be redirected to a page with header "Create a New Broker" and you need to enter following details in that form to create a local broker.

    Broker Name : localBroker
    Broker Type : local
  4. Finally click on Add Broker button and you will get the added broker to the list of available brokers.

For more information about brokers visit here.

Create Bucket with Siddhi

To create a bucket use 'Add' sub menu item under CEP Buckets in the Main menu. Bucket creation form has three major sections. Basic information, Input and query. How to fill those sections is described below. If you need more information about buckets visit here.

CEP Main Menu

Section 1 : Basic Information

Use the following information to fill the basic information section as shown in the below screen-shot.

Bucket Name (Name of the bucket)               : XMLStockQuoteAnalyzer
Description (Description about the bucket)     : This bucket analyzes stock quotes and trigger an event if stock price >20

Engine Provider(CEP Runtime engine to be used) : SiddhiCEPRuntime [Choose from the drop down]

Persistence snapshot time interval in minutes  : 0
Enable distributed processing                  : false

Screenshot of bucket information section of Add bucket page

Section 2 : Inputs

This section is used to define the inputs CEP will receive. To add an input click on Add Input link and then use following details. Screen shot is provided below for your convenience.

 

Topic( topic to events be received) : AllStockQuotes
Broker Name (Broker to be used)     : localBroker

Mapping

Stream (Name of the event stream) : StockPriceStream
Query Event Type                  : Tuple
Input Mapping Type                : XML Mapping

XPath prefixes

Prefix    : quotedata
Namespace :  http://ws.cdyne.com/
Note : Click on add button to add the defined xpath and it will be appeared in the xpath definitions table once it added
Properties (these properties will be extracted from the received xml event and fed to the CEP engine)

Name  : symbol
xpath : //quotedata:StockQuoteEvent/quotedata:StockSymbol
type  : java.lang.String [Choose from the drop down 

Name  : price
xpath : //quotedata:StockQuoteEvent/quotedata:LastTradeAmount
type  : java.lang.Double [Choose from the drop down]
Note : Click on add button to add the defined property to the input and it will be appeared in the Properties table once it added
Note : After filling all required fields click on the add button to add Input to the Bucket. Once you clicked it will disappear the input form and the added input will be appeared in Inputs table.

Important

Same procedure can be used to receive inputs from WSO2 Message Broker. You need to create WS-event broker as described here and use that broker instead of localBroker when defining input as mentioned above. Then CEP will accept inputs from WSO2 Message Broker.

Section 3 : Queries

This section is used to define the queries which will run on inputs and define outputs. To add a query click on Add query link and use following information. Screen shot is provided below for your convenience.

Query Name (To identify the query) : StockDetector

Expression:
             from StockPriceStream 
             insert into highStocks     
             symbol,price  
             group by symbol     
             having ((price > 20));

Output(Define the output)

Topic          : highStocks
Broker Name    : localBroker

Output Mapping : XML Mapping

XML Mapping(Define the XML mapping)

XML Mapping Text :

             <quotedata:StockQuoteDataEvent xmlns:quotedata="http://ws.cdyne.com/"
                                               xmlns:xsd="http://www.w3.org/2001/XMLSchema"
                                               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
                        <quotedata:StockSymbol>{symbol}</quotedata:StockSymbol>
                        <quotedata:LastTradeAmount>{price}</quotedata:LastTradeAmount>
             </quotedata:StockQuoteDataEvent>
After filling all the required fields of the Query , click on add query button to add Query to the bucket. Once you clicked it will disappear the query form and added query will be appeared in Queries table.
As the last step of adding a bucket , click on save button. Once you clicked it will redirect to the deployed buckets page and deployed bucket will be appeared in buckets table. Further to check whether you have entered the details correctly you can click on the link with the defined bucket and it will show added details. You can come back to the previous adding bucket page by clicking on the back button provided or if needed to edit the bucket you can click on the "Edit" link on the top of the View bucket Page.

Important

Same procedure can be used to send output to WSO2 Message Broker. You need to create WS-event broker as described here and use that broker instead of localBroker when defining output as mentioned above. Then CEP will send output to WSO2 Message Broker.
For you convenience bucket is attached here. Download it and copy the file into CEP_HOME/repository/deployment/server/cepbuckets and start WSO2 CEP again.

Invoking Deployed Bucket

When the bucket is successfully deployed, an axis2 service will be automatically created. To send events to bucket we can use that service. Also we can output subscriber to handle outputs using the output topic name.

It will take sometime to deploy the new service. So please be patient.

Define output subscriber

When the user send events to CEP with this service, there should be a subscriber to the output topic given, to receive filtered events from the CEP. So before send events to the engine we need to have an axis2 service deployed which prints received results to the console. Then create a subscription to the output topic, with providing the URL of the axis2 service as the subscription URL.

Use following java class and deploy it as a web service in CEP server. As for this example name the web service as FastMovingStockQuoteReceiverService.

FastMovingStockQuoteReceiver
import org.apache.axiom.om.OMElement;

public class FastMovingStockQuoteReceiver {

    public void getOMElement(OMElement omElement){

        System.out.println(""+omElement.toString());

    }

}
Note : To deploy a web service in cep, create *.aar and copy it to wso2cep-2.0.0/repository/deployment/server/axis2services before starting the server.

You will be able to see the axis2 service in the services list.

Now you can create a subscription to the output topic defined when configuring the bucket.

For your convenience to deploy web service automatically goto CEP_HOME /samples/services/FastMovingStockQuoteReceiverService in console and type ant. This will deploy a web service in CEP.

Create topic and subscribe

In order to subscribe, you need to create a topic.

Step 1: Click on "Add" sub menu item under "Topics" Menu in Manage section of the left panel

Step 2: Specify the topic name in the provided text box , in this case topic name is : "highStocks"(output topic) and click on 'Add Topic' button. This will add the topic to the server and you will be directed to the Topic Browser page.

Step 3: Once you click on the topic in topic browser page you will be able see four links as in the above image. Click in the subscribe link and you will be directed to Subscribe page. Then use following details to fill it. Once you are done click the Subscribe button.

Topic             : highStocks (Output topic)
Subscription mode : Topic only
Subscription URL  : http://localhost:9763/services/FastMovingStockQuoteService/getOMElement
Expiration Time   : select a future date from calender

Screen shot of Subscribe Topic page

Step 4 : You can verify whether you have correctly subscribe to the topic by click on "Details" link of that topic in topic browser page. Once you click on that , you will be directed to the "topic details " page and there you will find all the subscriptions for that topic and its children (if exists) and permission on that topic. Apart from that with the publish section, you can publish a test xml message to that topic and check whether it is received to you subscription URL.

Sending events to CEP engine

Last step of invoking the bucket is sending events to CEP engine. As states earlier bucket service is deployed as an axis 2 service. Hence we can write a java client to sent requests to the axis 2 service. Following class will directly send events to axis2 service.

XMLStockQuoteClient
import org.apache.axiom.om.OMElement;
import org.apache.axiom.om.util.AXIOMUtil;
import org.apache.axis2.AxisFault;
import org.apache.axis2.addressing.EndpointReference;
import org.apache.axis2.client.Options;
import org.apache.axis2.client.ServiceClient;
import javax.xml.stream.XMLStreamException;

public class XMLStockQuoteClient {
    public static void main(String[] args) {
        ServiceClient serviceClient = null;
        try {
            serviceClient = new ServiceClient();
            Options options = new Options();
            options.setTo(new EndpointReference("http://localhost:9763/services/localBrokerService/AllStockQuotes"));
            serviceClient.setOptions(options);

            if (serviceClient != null) {
                String xmlElement1 = "<quotedata:AllStockQuoteStream xmlns:quotedata=\"http://ws.cdyne.com/\">\n"
                        + "                    <quotedata:StockQuoteEvent>\n" 
                        + "              <quotedata:StockSymbol>MSFT</quotedata:StockSymbol>\n"
                        + "              <quotedata:LastTradeAmount>26.36</quotedata:LastTradeAmount>\n"
                        + "              <quotedata:StockChange>0.05</quotedata:StockChange>\n"
                        + "              <quotedata:OpenAmount>25.05</quotedata:OpenAmount>\n"
                        + "              <quotedata:DayHigh>25.46</quotedata:DayHigh>\n"
                        + "              <quotedata:DayLow>25.01</quotedata:DayLow>\n"
                        + "              <quotedata:StockVolume>20452658</quotedata:StockVolume>\n"
                        + "              <quotedata:PrevCls>25.31</quotedata:PrevCls>\n"
                        + "              <quotedata:ChangePercent>0.20</quotedata:ChangePercent>\n"
                        + "              <quotedata:FiftyTwoWeekRange>22.73 - 31.58</quotedata:FiftyTwoWeekRange>\n"
                        + "              <quotedata:EarnPerShare>2.326</quotedata:EarnPerShare>\n"
                        + "              <quotedata:PE>10.88</quotedata:PE>\n"
                        + "              <quotedata:CompanyName>Microsoft Corpora</quotedata:CompanyName>\n"
                        + "              <quotedata:QuoteError>false</quotedata:QuoteError>\n" 
                        + "                    </quotedata:StockQuoteEvent>\n"
                        + "                </quotedata:AllStockQuoteStream>";

                String xmlElement2 = "<quotedata:AllStockQuoteStream xmlns:quotedata=\"http://ws.cdyne.com/\">\n"
                        + "                    <quotedata:StockQuoteEvent>\n" 
                        + "              <quotedata:StockSymbol>MSFT</quotedata:StockSymbol>\n"
                        + "              <quotedata:LastTradeAmount>15</quotedata:LastTradeAmount>\n"
                        + "              <quotedata:StockChange>0.05</quotedata:StockChange>\n"
                        + "              <quotedata:OpenAmount>25.05</quotedata:OpenAmount>\n"
                        + "              <quotedata:DayHigh>25.46</quotedata:DayHigh>\n"
                        + "              <quotedata:DayLow>25.01</quotedata:DayLow>\n"
                        + "              <quotedata:StockVolume>20452658</quotedata:StockVolume>\n"
                        + "              <quotedata:PrevCls>25.31</quotedata:PrevCls>\n"
                        + "              <quotedata:ChangePercent>0.20</quotedata:ChangePercent>\n"
                        + "              <quotedata:FiftyTwoWeekRange>22.73 - 31.58</quotedata:FiftyTwoWeekRange>\n"
                        + "              <quotedata:EarnPerShare>2.326</quotedata:EarnPerShare>\n"
                        + "              <quotedata:PE>10.88</quotedata:PE>\n"
                        + "              <quotedata:CompanyName>Microsoft Corpora</quotedata:CompanyName>\n"
                        + "              <quotedata:QuoteError>false</quotedata:QuoteError>\n" 
                        + "                    </quotedata:StockQuoteEvent>\n"
                        + "                </quotedata:AllStockQuoteStream>";

                String xmlElement3 = "<quotedata:AllStockQuoteStream xmlns:quotedata=\"http://ws.cdyne.com/\">\n"
                        + "                    <quotedata:StockQuoteEvent>\n" 
                        + "              <quotedata:StockSymbol>MSFT</quotedata:StockSymbol>\n"
                        + "              <quotedata:LastTradeAmount>36</quotedata:LastTradeAmount>\n"
                        + "              <quotedata:StockChange>0.05</quotedata:StockChange>\n"
                        + "              <quotedata:OpenAmount>25.05</quotedata:OpenAmount>\n"
                        + "              <quotedata:DayHigh>25.46</quotedata:DayHigh>\n"
                        + "              <quotedata:DayLow>25.01</quotedata:DayLow>\n"
                        + "              <quotedata:StockVolume>20452658</quotedata:StockVolume>\n"
                        + "              <quotedata:PrevCls>25.31</quotedata:PrevCls>\n"
                        + "              <quotedata:ChangePercent>0.20</quotedata:ChangePercent>\n"
                        + "              <quotedata:FiftyTwoWeekRange>22.73 - 31.58</quotedata:FiftyTwoWeekRange>\n"
                        + "              <quotedata:EarnPerShare>2.326</quotedata:EarnPerShare>\n"
                        + "              <quotedata:PE>10.88</quotedata:PE>\n"
                        + "              <quotedata:CompanyName>Microsoft Corpora</quotedata:CompanyName>\n"
                        + "              <quotedata:QuoteError>false</quotedata:QuoteError>\n" 
                        + "                    </quotedata:StockQuoteEvent>\n"
                        + "                </quotedata:AllStockQuoteStream>";

                OMElement omElement1 = null;
                OMElement omElement2 = null;
                OMElement omElement3 = null;
                try {
                    omElement1 = AXIOMUtil.stringToOM(xmlElement1);
                    omElement2 = AXIOMUtil.stringToOM(xmlElement2);
                    omElement3 = AXIOMUtil.stringToOM(xmlElement3);
                    serviceClient.fireAndForget(omElement1);
                    serviceClient.fireAndForget(omElement2);
                    serviceClient.fireAndForget(omElement3);
                    Thread.sleep(500); // We need to wait some time for the
                                        // message to be sent
                } catch (XMLStreamException e) {
                    e.printStackTrace(); // To change body of catch statement
                                            // use File | Settings | File
                                            // Templates.
                } catch (AxisFault axisFault) {
                    axisFault.printStackTrace(); // To change body of catch
                                                    // statement use File |
                                                    // Settings | File
                                                    // Templates.
                }
            }
        } catch (Throwable t) {
            t.printStackTrace();
        }
    }
}

Above client will send a request to the deployed web service explained earlier.

For your convenience we have provided the class for you. To run the class go to CEP_HOME /samples/cep-samples in console and enter   ant xmlStockQuotePublisher. This will invoke the class.

When you run this class, if you have subscribed correctly to the output topic of the query, you will be able to see the filtered events in the console as below.

<quotedata:StockQuoteDataEvent xmlns:quotedata="http://ws.cdyne.com/"
                xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
                <quotedata:StockSymbol>MSFT</quotedata:StockSymbol>
                <quotedata:LastTradeAmount>26.36</quotedata:LastTradeAmount>
</quotedata:StockQuoteDataEvent>

<quotedata:StockQuoteDataEvent xmlns:quotedata="http://ws.cdyne.com/"
                xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
                <quotedata:StockSymbol>MSFT</quotedata:StockSymbol>
                <quotedata:LastTradeAmount>36</quotedata:LastTradeAmount>
</quotedata:StockQuoteDataEvent>

Copyright © WSO2 Inc. 2005-2014