Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Excerpt
hiddentrue

WSO2 CEP Samples : Access Analysis sample using Tuple mapping, Siddhi CEP Engine and local broker

...

Before creating the bucket to filter stock quotes it is essential to have a Carbon broker. Since In this example we are going to use agent Broker, it is needed to create a broker with type agent.To do that

  1. Start CEP Server.   Refer to theRunning Installing the Product for instructions.
  2. Login as admin.

  3. In the Configure menu you can find a Menu item called "Broker" and under that you can see sub menu 'Add' and click on that.

  4. You will get a page with header "Create a New Broker" and you need to enter following details in that form to create a agent broker.

    No Format
     Broker Name       : localAgentBroker 
     Type              : agent
     URL               : tcp://localhost:7611
     Authenticator URL : ssl://localhost:7711          
     User Name         : admin
     Password          : admin
  5. Finally click on Add Broker button and you will get the added broker to the list of available brokers.
  6. Repeat step 3 & 4 to create another new broker with following details.   

    No Format
     Broker Name       : externalAgentBroker 
     Type              : agent 
     URL               : tcp://localhost:7661    
     Authenticator URL : ssl://localhost:7761      
     User Name         : admin
     Password          : admin

    For more information about brokers visit here.

...

Note
As the last step of adding a bucket , click on save button. Once you clicked it will redirect to the deployed buckets page and deployed bucket will be appeared in buckets table. Further to check whether you have entered the details correctly you can click on the link with the defined bucket and it will show added details. You can come back to the previous adding bucket page by clicking on the back button provided or if needed to edit the bucket you can click on the "Edit" link on the top of the View bucket Page.

Invoking Deployed Bucket

When the bucket is successfully deployed, an axis2 service will be automatically created. To send events to bucket we can use that service. Also we can output subscriber to handle outputs using the output topic name.

Define output subscriber

When the user send events to CEP engine with this service, there should be a subscriber to the output topic given, when configuring the query of the bucket, to receive filtered events from the complex event processing engine. In this particular sample we need to host a thrift receiving server at specified address to receive output events. Following class will host a thrift receiver listening on ports specified when creating the bucket.host a thrift receiver listening on ports specified when creating the bucket.

Info
titleInfo

Find the sample project file of the subscriber and publisher from the below link;

https://svn.wso2.org/repos/wso2/people/suho/doc-sample/Access_Analyzer-Publisher_Subscriber.zip

Code Block
languagejava
titleTestAgentServer
linenumberstrue
collapsetrue
import org.apache.log4j.Logger;
import org.wso2.carbon.databridge.commons.Credentials;
import org.wso2.carbon.databridge.commons.Event;
import org.wso2.carbon.databridge.commons.StreamDefinition;
import org.wso2.carbon.databridge.commons.thrift.utils.HostAddressFinder;
import org.wso2.carbon.databridge.core.AgentCallback;
import org.wso2.carbon.databridge.core.DataBridge;
import org.wso2.carbon.databridge.core.definitionstore.InMemoryStreamDefinitionStore;
import org.wso2.carbon.databridge.core.exception.DataBridgeException;
import org.wso2.carbon.databridge.core.internal.authentication.AuthenticationHandler;
import org.wso2.carbon.databridge.receiver.thrift.internal.ThriftDataReceiver;


import java.net.SocketException;
import java.util.List;


public class TestAgentServer {
    Logger log = Logger.getLogger(TestAgentServer.class);
    ThriftDataReceiver thriftDataReceiver;
    static final TestAgentServer testServer = new TestAgentServer();




    public static void main(String[] args) throws DataBridgeException {
        testServer.start(7661);
        synchronized (testServer) {
            try {
                testServer.wait();
            } catch (InterruptedException ignored) {


            }
        }
    }


    public void start(int receiverPort) throws DataBridgeException {
        KeyStoreUtil.setKeyStoreParams();
        DataBridge databridge = new DataBridge(new AuthenticationHandler() {
            @Override
            public boolean authenticate(String userName,
                                        String password) {
                return true;// allays authenticate to true


            }
        }, new InMemoryStreamDefinitionStore());


        thriftDataReceiver = new ThriftDataReceiver(receiverPort, databridge);


        databridge.subscribe(new AgentCallback() {
            int totalSize = 0;


            public void definedStream(StreamDefinition streamDefinition,
                                      Credentials credentials) {
                log.info("StreamDefinition " + streamDefinition);
            }


            @Override
            public void receive(List<Event> eventList, Credentials credentials) {
                log.info("eventListSize=" + eventList.size() + " eventList " + eventList + " for username " + credentials.getUsername());
            }


        });


        try {
            String address = HostAddressFinder.findAddress("localhost");
            log.info("Test Server starting on " + address);
            thriftDataReceiver.start(address);
            log.info("Test Server Started");
        } catch (SocketException e) {
            log.error("Test Server not started !", e);
        }
    }


    public void stop() {
        thriftDataReceiver.stop();
        log.info("Test Server Stopped");
    }
}

...

Code Block
languagebash
[java] StreamDefinition StreamDefinition{streamId='users.org1-1.2.0-12ecdab1-0bb0-44ff-b782-3af4040f0890', name='users.org1', version='1.2.0', nickName='null', description='null', tags=null, metaData=[Attribute{name='organization', type=STRING}, Attribute{name='country', type=STRING}], correlationData=[], payloadData=[Attribute{name='email', type=STRING}]}
[java] admin connected
[java] eventListSize=1 eventList [Event{streamId='users.org1-1.2.0-12ecdab1-0bb0-44ff-b782-3af4040f0890', timeStamp=0, metaData=[org1, Au], correlationData=[], payloadData=[sam@org1.com]}] for username admin
[java] eventListSize=1 eventList [Event{streamId='users.org1-1.2.0-12ecdab1-0bb0-44ff-b782-3af4040f0890', timeStamp=0, metaData=[org1, Au], correlationData=[], payloadData=[abc@org1.com]}] for username admin

 

Find the sample project file of the subscriber and publisher from the below link;

https://svn.wso2.org/repos/wso2/people/suho/doc-sample/Access_Analyzer-Publisher_Subscriber.zip

 

...