This documentation is for WSO2 CEP 2.1.0. View the home page of the latest release.

Access Analyzer

Access Analysis sample is based on user access data of an organization. In this sample WSO2 CEP will receive set of access information and give an output if organization name matches a predefined name. In this sample both inputs and outputs are sent as tuples and tuple mapping is used.

Create Agent Broker

Before creating the bucket to filter stock quotes it is essential to have a Carbon broker. Since In this example we are going to use agent Broker, it is needed to create a broker with type agent.To do that

  1. Start CEP Server.   Refer to the Installing the Product for instructions.
  2. Login as admin.

  3. In the Configure menu you can find a Menu item called "Broker" and under that you can see sub menu 'Add' and click on that.

  4. You will get a page with header "Create a New Broker" and you need to enter following details in that form to create a agent broker.

     Broker Name       : localAgentBroker 
     Type              : agent
     URL               : tcp://localhost:7611
     Authenticator URL : ssl://localhost:7711          
     User Name         : admin
     Password          : admin
  5. Finally click on Add Broker button and you will get the added broker to the list of available brokers.
  6. Repeat step 3 & 4 to create another new broker with following details.   

     Broker Name       : externalAgentBroker 
     Type              : agent 
     URL               : tcp://localhost:7661    
     Authenticator URL : ssl://localhost:7761      
     User Name         : admin
     Password          : admin

    For more information about brokers visit here.

Create Bucket with Siddhi

To create a bucket use Add menu item under CEP Buckets in the Main menu. Bucket creation form has three major sections. Basic information, Input and query. How to fill those sections is described below. If you need more information about buckets visit here.

Section 1 : Basic Information

Use the following information to fill the basic information section as shown in the below screenshot.

Bucket Name (Name of the bucket)               : AccessAnalysisBucket
Description (Description about the bucket)     : Analyses user access

Engine Provider(CEP Runtime engine to be used) : SiddhiCEPRuntime [Choose from the drop down]

Persistence snapshot time interval in minutes  : 0
Enable distributed processing                  : false

Section 2 : Inputs

This section is used to define the inputs CEP will receive. To add an input click on Add Input link and then use following details. Screen shot is provided below for your convenience.

Topic( topic to events be received): analytics_Statistics/1.3.0
Broker Name (Broker to be used)    : localAgentBroker

Mapping

Stream (Name of the event stream) : analyticsStatisticsStream
Query Event Type                  : Tuple
Input Mapping Type                : Tuple Mapping

Properties (these properties will be extracted from the received tuple event and fed to the CEP engine)

Name            : userOrganization
Input Name      : userOrg
Input Data Type : payload data
Type            : String 

Name            : userID
Input Name      : userID
Input Data Type : payload data
Type            : String  

Name            : country
Input Name      : country
Input Data Type : payload data
Type            : String

Name            : dateInMonth
Input Name      : date
Input Data Type : payload data
Type            : Integer

 

Section 3 : Queries

This section is used to define the queries which will run on inputs and define outputs. To add a query click on Add query link and use following information. Screen shot is provided below for your convenience.

Query Name (To identify the query) : FilterQuery

Expression : from analyticsStatisticsStream[userOrganization=='org1']
			 insert into outStream
			 userID, userOrganization, country;

Output(Define the output)

Topic          : users.org1/1.2.0
Broker Name    : externalAgentBroker
Output Mapping : Tuple Mapping

Tuple Mapping

Meta Data

Name : organization       value of : userOrganization       Type : String
Name : country            value of : country                Type : String

Payload data

Name : email              value of : userID                 Type : String
After filling all the required fields of the Query , click on add query button to add Query to the bucket. Once you clicked it will disappear the query form and added query will be appeared in Queries table.
As the last step of adding a bucket , click on save button. Once you clicked it will redirect to the deployed buckets page and deployed bucket will be appeared in buckets table. Further to check whether you have entered the details correctly you can click on the link with the defined bucket and it will show added details. You can come back to the previous adding bucket page by clicking on the back button provided or if needed to edit the bucket you can click on the "Edit" link on the top of the View bucket Page.

Define output subscriber

When the user send events to CEP engine with this service, there should be a subscriber to the output topic given, when configuring the query of the bucket, to receive filtered events from the complex event processing engine. In this particular sample we need to host a thrift receiving server at specified address to receive output events. Following class will host a thrift receiver listening on ports specified when creating the bucket.

Info

Find the sample project file of the subscriber and publisher from the below link;

https://svn.wso2.org/repos/wso2/people/suho/doc-sample/Access_Analyzer-Publisher_Subscriber.zip

TestAgentServer
import org.apache.log4j.Logger;
import org.wso2.carbon.databridge.commons.Credentials;
import org.wso2.carbon.databridge.commons.Event;
import org.wso2.carbon.databridge.commons.StreamDefinition;
import org.wso2.carbon.databridge.commons.thrift.utils.HostAddressFinder;
import org.wso2.carbon.databridge.core.AgentCallback;
import org.wso2.carbon.databridge.core.DataBridge;
import org.wso2.carbon.databridge.core.definitionstore.InMemoryStreamDefinitionStore;
import org.wso2.carbon.databridge.core.exception.DataBridgeException;
import org.wso2.carbon.databridge.core.internal.authentication.AuthenticationHandler;
import org.wso2.carbon.databridge.receiver.thrift.internal.ThriftDataReceiver;


import java.net.SocketException;
import java.util.List;


public class TestAgentServer {
    Logger log = Logger.getLogger(TestAgentServer.class);
    ThriftDataReceiver thriftDataReceiver;
    static final TestAgentServer testServer = new TestAgentServer();




    public static void main(String[] args) throws DataBridgeException {
        testServer.start(7661);
        synchronized (testServer) {
            try {
                testServer.wait();
            } catch (InterruptedException ignored) {


            }
        }
    }


    public void start(int receiverPort) throws DataBridgeException {
        KeyStoreUtil.setKeyStoreParams();
        DataBridge databridge = new DataBridge(new AuthenticationHandler() {
            @Override
            public boolean authenticate(String userName,
                                        String password) {
                return true;// allays authenticate to true


            }
        }, new InMemoryStreamDefinitionStore());


        thriftDataReceiver = new ThriftDataReceiver(receiverPort, databridge);


        databridge.subscribe(new AgentCallback() {
            int totalSize = 0;


            public void definedStream(StreamDefinition streamDefinition,
                                      Credentials credentials) {
                log.info("StreamDefinition " + streamDefinition);
            }


            @Override
            public void receive(List<Event> eventList, Credentials credentials) {
                log.info("eventListSize=" + eventList.size() + " eventList " + eventList + " for username " + credentials.getUsername());
            }


        });


        try {
            String address = HostAddressFinder.findAddress("localhost");
            log.info("Test Server starting on " + address);
            thriftDataReceiver.start(address);
            log.info("Test Server Started");
        } catch (SocketException e) {
            log.error("Test Server not started !", e);
        }
    }


    public void stop() {
        thriftDataReceiver.stop();
        log.info("Test Server Stopped");
    }
}

Run this class from a separate terminal. Once the sample is running you will be able to see the results from that terminal.

Sending events to CEP engine

Last step of invoking the bucket is sending events to CEP engine. As states earlier bucket service is deployed as an axis 2 service. In this sample we are sending event tuples using thrift port of the server. So we need a client to generate events stream and send it to CEP's thrift port. Following client will accomplish that task.

AccessDataPublisher
import org.wso2.carbon.databridge.agent.thrift.DataPublisher;
import org.wso2.carbon.databridge.agent.thrift.exception.AgentException;
import org.wso2.carbon.databridge.commons.exception.AuthenticationException;
import org.wso2.carbon.databridge.commons.exception.DifferentStreamDefinitionAlreadyDefinedException;
import org.wso2.carbon.databridge.commons.exception.MalformedStreamDefinitionException;
import org.wso2.carbon.databridge.commons.exception.NoStreamDefinitionExistException;
import org.wso2.carbon.databridge.commons.exception.StreamDefinitionException;
import org.wso2.carbon.databridge.commons.exception.TransportException;
import org.wso2.carbon.databridge.core.exception.DataBridgeException;

import java.net.MalformedURLException;

public class AccessDataPublisher {


    public static void main(String[] args)
            throws DataBridgeException, AgentException, MalformedURLException,
                   AuthenticationException, TransportException, MalformedStreamDefinitionException,
                   StreamDefinitionException, DifferentStreamDefinitionAlreadyDefinedException,
                   InterruptedException {

        KeyStoreUtil.setTrustStoreParams();

        //according to the convention the authentication port will be 7611+100= 7711 and its host will be the same

        DataPublisher dataPublisher = new DataPublisher("tcp://localhost:7611", "admin", "admin");

        String streamId;
        try {
            streamId = dataPublisher.findStream("analytics_Statistics", "1.3.0");
        } catch (NoStreamDefinitionExistException e) {
            streamId = dataPublisher.defineStream("{" +
                                                  "  'name':'analytics_Statistics'," +
                                                  "  'version':'1.3.0'," +
                                                  "  'nickName': 'Analytics Statistics Information'," +
                                                  "  'description': 'Details of Analytics Statistics'," +
                                                  "  'metaData':[" +
                                                  "          {'name':'ipAdd','type':'STRING'}" +
                                                  "  ]," +
                                                  "  'payloadData':[" +
                                                  "          {'name':'userID','type':'STRING'}," +
                                                  "          {'name':'timeStamp','type':'FLOAT'}," +
                                                  "          {'name':'month','type':'STRING'}," +
                                                  "          {'name':'date','type':'INT'}," +
                                                  "          {'name':'monthsPassed','type':'INT'}," +
                                                  "          {'name':'userOrg','type':'STRING'}," +
                                                  "          {'name':'document','type':'STRING'}," +
                                                  "          {'name':'country','type':'STRING'}," +
                                                  "          {'name':'name','type':'STRING'}," +
                                                  "          {'name':'referalurl','type':'STRING'}," +
                                                  "          {'name':'tag','type':'STRING'}," +
                                                  "          {'name':'searchTerms','type':'STRING'}" +
                                                  "  ]" +
                                                  "}");

        }

        //In this case correlation data is null
        dataPublisher.publish(streamId, new Object[]{"192.168.1.1"}, null, new Object[]{"abc@org1.com", 120f, "May", 21, 24148, "org1", "document1", "Au", "abc", "http://wso2.com", "other", "searchTerms"});
        dataPublisher.publish(streamId, new Object[]{"192.168.1.1"}, null, new Object[]{"anne@org2.com", 2342f, "June", 12, 24149, "org2", "document2", "SL", "anne", "http://wso2.com/products", "BAM", "business"});
        dataPublisher.publish(streamId, new Object[]{"192.168.1.3"}, null, new Object[]{"sam@org1.com", 12414f, "May", 19, 24148, "org1", "document3", "Au", "sam", "http://wso2.com/team", "team", "team"});
        dataPublisher.publish(streamId, new Object[]{"192.168.1.2"}, null, new Object[]{"anne@org1.com", 489f, "April", 25, 24147, "org2", "document4", "SL", "anne", "http://wso2.com", "other", "searchTerms"});
        dataPublisher.publish(streamId, new Object[]{"192.168.1.3"}, null, new Object[]{"ann@org3.com", 21324f, "March", 10, 24146, "org3", "document5", "US", "ann", "http://wso2.com/products", "cloud", "cloud"});

        Thread.sleep(3000);
        dataPublisher.stop();
    }
}

After running this class, if you have done everything you will receive this output at receiver console.

[java] StreamDefinition StreamDefinition{streamId='users.org1-1.2.0-12ecdab1-0bb0-44ff-b782-3af4040f0890', name='users.org1', version='1.2.0', nickName='null', description='null', tags=null, metaData=[Attribute{name='organization', type=STRING}, Attribute{name='country', type=STRING}], correlationData=[], payloadData=[Attribute{name='email', type=STRING}]}
[java] admin connected
[java] eventListSize=1 eventList [Event{streamId='users.org1-1.2.0-12ecdab1-0bb0-44ff-b782-3af4040f0890', timeStamp=0, metaData=[org1, Au], correlationData=[], payloadData=[sam@org1.com]}] for username admin
[java] eventListSize=1 eventList [Event{streamId='users.org1-1.2.0-12ecdab1-0bb0-44ff-b782-3af4040f0890', timeStamp=0, metaData=[org1, Au], correlationData=[], payloadData=[abc@org1.com]}] for username admin

Copyright © WSO2 Inc. 2005-2014