Access Analysis sample is based on user access data of an organization. In this sample WSO2 CEP will receive set of access information and give an output if organization name matches a predefined name. In this sample both inputs and outputs are sent as tuples and tuple mapping is used.
Create Agent Broker
Before creating the bucket to filter stock quotes it is essential to have a Carbon broker. Since In this example we are going to use agent Broker, it is needed to create a broker with type agent.To do that
- Start CEP Server. Refer to the Running the Product for instructions.
- Login as admin.
In the Configure menu you can find a Menu item called "Broker" and under that you can see sub menu 'Add' and click on that.
You will get a page with header "Create a New Broker" and you need to enter following details in that form to create a agent broker.
Broker Name : localAgentBroker Type : agent URL : tcp://localhost:7611 Authenticator URL : ssl://localhost:7711 User Name : admin Password : admin
- Finally click on Add Broker button and you will get the added broker to the list of available brokers.
Repeat step 3 & 4 to create another new broker with following details.
Broker Name : externalAgentBroker Type : agent URL : tcp://localhost:7661 Authenticator URL : ssl://localhost:7761 User Name : admin Password : admin
For more information about brokers visit here.
Create Bucket with Siddhi
To create a bucket use Add menu item under CEP Buckets in the Main menu. Bucket creation form has three major sections. Basic information, Input and query. How to fill those sections is described below. If you need more information about buckets visit here.
Section 1 : Basic Information
Use the following information to fill the basic information section as shown in the below screenshot.
Bucket Name (Name of the bucket) : AccessAnalysisBucket Description (Description about the bucket) : Analyses user access Engine Provider(CEP Runtime engine to be used) : SiddhiCEPRuntime [Choose from the drop down] Persistence snapshot time interval in minutes : 0 Enable distributed processing : false
Section 2 : Inputs
This section is used to define the inputs CEP will receive. To add an input click on Add Input link and then use following details. Screen shot is provided below for your convenience.
Topic( topic to events be received): analytics_Statistics/1.3.0 Broker Name (Broker to be used) : localAgentBroker Mapping Stream (Name of the event stream) : analyticsStatisticsStream Query Event Type : Tuple Input Mapping Type : Tuple Mapping Properties (these properties will be extracted from the received tuple event and fed to the CEP engine) Name : userOrganization Input Name : userOrg Input Data Type : payload data Type : String Name : userID Input Name : userID Input Data Type : payload data Type : String Name : country Input Name : country Input Data Type : payload data Type : String Name : dateInMonth Input Name : date Input Data Type : payload data Type : Integer
Section 3 : Queries
This section is used to define the queries which will run on inputs and define outputs. To add a query click on Add query link and use following information. Screen shot is provided below for your convenience.
Query Name (To identify the query) : FilterQuery Expression : from analyticsStatisticsStream[userOrganization=='org1'] insert into outStream userID, userOrganization, country; Output(Define the output) Topic : users.org1/1.2.0 Broker Name : externalAgentBroker Output Mapping : Tuple Mapping Tuple Mapping Meta Data Name : organization value of : userOrganization Type : String Name : country value of : country Type : String Payload data Name : email value of : userID Type : String
Invoking Deployed Bucket
When the bucket is successfully deployed, an axis2 service will be automatically created. To send events to bucket we can use that service. Also we can output subscriber to handle outputs using the output topic name.
Define output subscriber
When the user send events to CEP engine with this service, there should be a subscriber to the output topic given, when configuring the query of the bucket, to receive filtered events from the complex event processing engine. In this particular sample we need to host a thrift receiving server at specified address to receive output events. Following class will host a thrift receiver listening on ports specified when creating the bucket.
Run this class from a separate terminal. Once the sample is running you will be able to see the results from that terminal.
Sending events to CEP engine
Last step of invoking the bucket is sending events to CEP engine. As states earlier bucket service is deployed as an axis 2 service. In this sample we are sending event tuples using thrift port of the server. So we need a client to generate events stream and send it to CEP's thrift port. Following client will accomplish that task.
After running this class, if you have done everything you will receive this output at receiver console.
[java] StreamDefinition StreamDefinition{streamId='users.org1-1.2.0-12ecdab1-0bb0-44ff-b782-3af4040f0890', name='users.org1', version='1.2.0', nickName='null', description='null', tags=null, metaData=[Attribute{name='organization', type=STRING}, Attribute{name='country', type=STRING}], correlationData=[], payloadData=[Attribute{name='email', type=STRING}]} [java] admin connected [java] eventListSize=1 eventList [Event{streamId='users.org1-1.2.0-12ecdab1-0bb0-44ff-b782-3af4040f0890', timeStamp=0, metaData=[org1, Au], correlationData=[], payloadData=[sam@org1.com]}] for username admin [java] eventListSize=1 eventList [Event{streamId='users.org1-1.2.0-12ecdab1-0bb0-44ff-b782-3af4040f0890', timeStamp=0, metaData=[org1, Au], correlationData=[], payloadData=[abc@org1.com]}] for username admin