Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Excerpt
hiddentrue

WSO2 CEP Samples : Access Analysis sample using Tuple mapping, Siddhi CEP Engine and local broker

...

Before creating the bucket to filter stock quotes it is essential to have a Carbon broker. Since In this example we are going to use agent Broker, it is needed to create a broker with type agent.To do that

  1. Start CEP Server.   Refer to theRunning Installing the Product for instructions.
  2. Login as admin.

  3. In the Configure menu you can find a Menu item called "Broker" and under that you can see sub menu 'Add' and click on that.

  4. You will get a page with header "Create a New Broker" and you need to enter following details in that form to create a agent broker.

    No Format
     Broker Name       : localAgentBroker 
     Type              : agent
     URL               : tcp://localhost:7611
     Authenticator URL : ssl://localhost:7711          
     User Name         : admin
     Password          : admin
  5. Finally click on Add Broker button and you will get the added broker to the list of available brokers.
  6. Repeat step 3 & 4 to create another new broker with following details.   

    No Format
     Broker Name       : externalAgentBroker 
     Type              : agent 
     URL               : tcp://localhost:7661    
     Authenticator URL : ssl://localhost:7761      
     User Name         : admin
     Password          : admin

    For more information about brokers visit here.

...

When the user send events to CEP engine with this service, there should be a subscriber to the output topic given, when configuring the query of the bucket, to receive filtered events from the complex event processing engine. In this particular sample we need to host a thrift receiving server at specified address to receive output events. Following class will host a thrift receiver listening on ports specified when creating the bucket.

Info
titleInfo

Find the sample project file of the subscriber and publisher from the below link;

https://svn.wso2.org/repos/wso2/people/suho/doc-sample/Access_Analyzer-Publisher_Subscriber.zip

Code Block
languagejava
titleTestAgentServer
linenumberstrue
collapsetrue
import org.apache.log4j.Logger;
import org.wso2.carbon.databridge.commons.Credentials;
import org.wso2.carbon.databridge.commons.Event;
import org.wso2.carbon.databridge.commons.StreamDefinition;
import org.wso2.carbon.databridge.commons.thrift.utils.HostAddressFinder;
import org.wso2.carbon.databridge.core.AgentCallback;
import org.wso2.carbon.databridge.core.DataBridge;
import org.wso2.carbon.databridge.core.definitionstore.InMemoryStreamDefinitionStore;
import org.wso2.carbon.databridge.core.exception.DataBridgeException;
import org.wso2.carbon.databridge.core.internal.authentication.AuthenticationHandler;
import org.wso2.carbon.databridge.receiver.thrift.internal.ThriftDataReceiver;


import java.net.SocketException;
import java.util.List;


public class TestAgentServer {
    Logger log = Logger.getLogger(TestAgentServer.class);
    ThriftDataReceiver thriftDataReceiver;
    static final TestAgentServer testServer = new TestAgentServer();




    public static void main(String[] args) throws DataBridgeException {
        testServer.start(7661);
        synchronized (testServer) {
            try {
                testServer.wait();
            } catch (InterruptedException ignored) {


            }
        }
    }


    public void start(int receiverPort) throws DataBridgeException {
        KeyStoreUtil.setKeyStoreParams();
        DataBridge databridge = new DataBridge(new AuthenticationHandler() {
            @Override
            public boolean authenticate(String userName,
                                        String password) {
                return true;// allays authenticate to true


            }
        }, new InMemoryStreamDefinitionStore());


        thriftDataReceiver = new ThriftDataReceiver(receiverPort, databridge);


        databridge.subscribe(new AgentCallback() {
            int totalSize = 0;


            public void definedStream(StreamDefinition streamDefinition,
                                      Credentials credentials) {
                log.info("StreamDefinition " + streamDefinition);
            }


            @Override
            public void receive(List<Event> eventList, Credentials credentials) {
                log.info("eventListSize=" + eventList.size() + " eventList " + eventList + " for username " + credentials.getUsername());
            }


        });


        try {
            String address = HostAddressFinder.findAddress("localhost");
            log.info("Test Server starting on " + address);
            thriftDataReceiver.start(address);
            log.info("Test Server Started");
        } catch (SocketException e) {
            log.error("Test Server not started !", e);
        }
    }


    public void stop() {
        thriftDataReceiver.stop();
        log.info("Test Server Stopped");
    }
}

...

Code Block
languagebash
[java] StreamDefinition StreamDefinition{streamId='users.org1-1.2.0-12ecdab1-0bb0-44ff-b782-3af4040f0890', name='users.org1', version='1.2.0', nickName='null', description='null', tags=null, metaData=[Attribute{name='organization', type=STRING}, Attribute{name='country', type=STRING}], correlationData=[], payloadData=[Attribute{name='email', type=STRING}]}
[java] admin connected
[java] eventListSize=1 eventList [Event{streamId='users.org1-1.2.0-12ecdab1-0bb0-44ff-b782-3af4040f0890', timeStamp=0, metaData=[org1, Au], correlationData=[], payloadData=[sam@org1.com]}] for username admin
[java] eventListSize=1 eventList [Event{streamId='users.org1-1.2.0-12ecdab1-0bb0-44ff-b782-3af4040f0890', timeStamp=0, metaData=[org1, Au], correlationData=[], payloadData=[abc@org1.com]}] for username admin
Info
titleInfo

Find the sample project file of the subscriber and publisher from the below link;

https://svn.wso2.org/repos/wso2/people/suho/doc-sample/Access_Analyzer-Publisher_Subscriber.zip