This documentation is for WSO2 CEP 2.1.0. View the home page of the latest release.

Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 28 Next »

This section describes how to send WSO2 events to Agent broker via Custom Data Publishers. There are two main methods available to do this.

  1. Java API (See Example in KPI Analyzer).
  2. REST API (See Example in Build Analyzer).

In both above mentioned methods, we can publish data in 3 ways. They are

  • Data Publisher
  • AsyncDataPublisher
  • Load Balancing DataPublisher

Following blog post and article will provide more information regarding the above mentioned ways to publish data.

http://sinthu-rajan.blogspot.com/2012/12/non-blocking-data-publshing-for-bamcep.html

http://wso2.org/library/articles/2012/07/creating-custom-agents-publish-events-bamcep

When publishing data in above mentioned ways, Before sending events we need to define the stream definition to send data.

All the values that are send in the event must match with the stream definition (In previous CEP versions) but CEP version 2.1.0 supports to send arbitrary key-value pairs which are not defined in the stream.

Below, we have discussed further regarding that.

Supporting arbitrary key-value pairs in WSO2 events 

Our previous CEP versions are not support to send arbitrary key-value values which are not defined in the stream.

But WSO2 CEP 2.1.0 version supports to send arbitrary key-value pairs. This only can be done using the agent broker provided in CEP by default.

When sending the arbitrary value, it needs to be send as Map data format and both key and value pair needs to be in String format. 

1. Creating the agent and datapublisher to publish the data

AgentConfiguration agentConfiguration = new AgentConfiguration();
Agent agent = new Agent(agentConfiguration);
        
DataPublisher dataPublisher = new DataPublisher("tcp://localhost:7611", "admin", "admin", agent);

2. Defining the stream as shown below.  

streamId = dataPublisher.defineStream("{" +
                                                  "  'name':'" + PHONE_RETAIL_STREAM + "'," +
                                                  "  'version':'" + VERSION + "'," +
                                                  "  'nickName': 'Phone_Retail_Shop'," +
                                                  "  'description': 'Phone Sales'," +
                                                  "  'metaData':[" +
                                                  "          {'name':'clientType','type':'STRING'}" +
                                                  "  ]," +
                                                  "  'payloadData':[" +
                                                  "          {'name':'brand','type':'STRING'}," +
                                                  "          {'name':'quantity','type':'INT'}," +
                                                  "          {'name':'total','type':'INT'}," +
                                                  "  ]" +
                                                  "}");

3. Publishing the data

Event eventOne = new Event(streamId, System.currentTimeMillis(), new Object[]{"external"}, null,
                                   new Object[]{"IBM", 45, 2500});

dataPublisher.publish(eventOne); 

Here if we want to publish the name of the buyer which is not defined in the Stream, then we need do the below steps

4. Defining a Map (To send key-value pair)

Map arbitraryData = new HashMap<String,String>();
arbitraryData.put("buyer","Joe");

Here when defining the key, we need to follow some rules

  1. If you want to define the key-value pair as a meta data, then follow the syntax eg : arbitraryData.put("meta.buyer","Joe");
  2. If you want to define the key-value pair as correlation data, then follow the syntax eg : arbitraryData.put("correlation.buyer","Joe");
  3. If you want to define the key-value pair as payload data, then follow the syntax eg : arbitraryData.put("buyer","Joe"); - No any prefix

Other than the above mention situations, you can't use dot(.) for key.

5. Publish the map with the event.

Event eventOne = new Event(streamId, System.currentTimeMillis(), new Object[]{"external"}, null,
                                   new Object[]{"IBM", 45, 2500},arbitraryData);

dataPublisher.publish(eventOne);
  • No labels