Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Migrated to Confluence 5.3

Here we are going to discuss about how we can This section describes the following main methods available to send WSO2 events to Agent Broker. There are two main methods available to do this.broker via Custom Data Publishers.

In both methods above mentioned methods, we can publish data in 3 ways as follows. They are

  • Data Publisher
  • AsyncDataPublisher
  • Load Balancing DataPublisher

Following blog post and article will provide more information regarding the above mentioned ways to publish data.

http://sinthu-rajan.blogspot.com/2012/12/non-blocking-data-publshing-for-bamcep.html

Info
titleInfo

For more information on data publishers, refer to this article in WSO2 library: http://wso2.org/library/articles/2012/07/creating-custom-agents-publish-events-bamcep.

When publishing data in above mentioned ways, Before sending events we need to using any of the three methods mentioned above, you should define the stream definition to send data before sending events.All the values that are send  

Supporting Arbitrary Key-Value Pairs in WSO2 Events 

Although in previous CEP release versions, all values sent in the event must match with the stream definition (Previous CEP versions) but our CEP , CEP version 2.1.0 supports to send arbitrary key-value pairs which are not defined in the stream.

Below we have discussed further regarding that.

Supporting arbitrary key-value pairs in WSO2 events 

Our previous CEP versions are not and after have support to send arbitrary key-value values pairs, which are not defined in the stream.But WSO2 CEP 2.1. 0 version supports to send arbitrary key-value pairs. This only can be done using the agent default Agent broker provided in CEP by default.

When sending Send the arbitrary value , it needs to be send as in Map data format and both the key and value pair needs to be -value pairs in String format. 

 

1. Creating Create the agent and datapublisher data publisher to publish the data as follows.

Code Block
languagejava
linenumberstrue
AgentConfiguration agentConfiguration = new AgentConfiguration();
Agent agent = new Agent(agentConfiguration);
        
DataPublisher dataPublisher = new DataPublisher("tcp://localhost:7611", "admin", "admin", agent);

2. Defining  Define the stream as shown below.  

Code Block
languagejava
linenumberstrue
streamId = dataPublisher.defineStream("{" +
                                                  "  'name':'" + PHONE_RETAIL_STREAM + "'," +
                                                  "  'version':'" + VERSION + "'," +
                                                  "  'nickName': 'Phone_Retail_Shop'," +
                                                  "  'description': 'Phone Sales'," +
                                                  "  'metaData':[" +
                                                  "          {'name':'clientType','type':'STRING'}" +
                                                  "  ]," +
                                                  "  'payloadData':[" +
                                                  "          {'name':'brand','type':'STRING'}," +
                                                  "          {'name':'quantity','type':'INT'}," +
                                                  "          {'name':'total','type':'INT'}," +
                                                  "  ]" +
                                                  "}");

3. Publishing Publish the data as follows.

Code Block
languagejava
linenumberstrue
Event eventOne = new Event(streamId, System.currentTimeMillis(), new Object[]{"external"}, null,
                                   new Object[]{"IBM", 45, 2500});

dataPublisher.publish(eventOne); 

Here if we want Follow the steps below to publish the name of the buyer which that is not defined in the Stream, then we need do the below steps.

4. Defining Define a Map (To send the key-value pair).

Code Block
languagejava
linenumberstrue
Map arbitraryData = new HashMap<String,String>();
arbitraryData.put("buyer","Joe");
Note

Here Note the following rules when defining the key, we need to follow some rules

If you want to

:

  1. To define the key-value pair as a meta data, then follow the syntax eg : arbitraryData.put("meta.buyerbuyer","Joe");
  2. If you want to To define the key-value pair as correlation data, then follow the syntax eg : arbitraryData.put("correlation.buyer","Joe");
  3. If you want to To define the key-value pair as payload data, then follow the syntax eg : arbitraryData.put("buyer","Joe"); - No any prefix.

Other than the above mention situations, you can't use dot(.) for key.

 

5. Publish the map with the event.

Code Block
languagejava
linenumberstrue
Event eventOne = new Event(streamId, System.currentTimeMillis(), new Object[]{"external"}, null,
                                   new Object[]{"IBM", 45, 2500},arbitraryData);

dataPublisher.publish(eventOne);
Info
titleInfo

To receive arbitrary values in the CEP, bucket needs to be configured properly but it does not need any special configuration. We need to follow same approach which we are using for Tuple mapping.

  1. If we are defining a key-value pair as meta data then it needs to be configured as meta data in the input mapping of the bucket.
  2. If we are defining a key-value pair as correlation data then it needs to be configured as correlation data in the input mapping of the bucket.
  3. If we are defining a key-value pair as payload data then it needs to be configured as payload data in the input mapping of the bucket.

Example: If we defining the key-value as meta data then,

Image Added