This documentation is for WSO2 CEP 2.1.0. View the home page of the latest release.

Custom Data Publishers

This section describes the following main methods available to send WSO2 events to Agent broker via Custom Data Publishers.

In both methods above, we can publish data in 3 ways as follows.

  • Data Publisher
  • AsyncDataPublisher
  • Load Balancing DataPublisher

Info

For more information on data publishers, refer to this article in WSO2 library: http://wso2.org/library/articles/2012/07/creating-custom-agents-publish-events-bamcep.

When publishing data using any of the three methods mentioned above, you should define the stream definition to send data before sending events. 

Supporting Arbitrary Key-Value Pairs in WSO2 Events 

Although in previous CEP release versions, all values sent in the event must match the stream definition, CEP version 2.1.0 and after have support to send arbitrary key-value pairs, which are not defined in the stream. This can be done using the default Agent broker in CEP.

Send the arbitrary value in Map data format and the key-value pairs in String format. 

1. Create the agent and data publisher to publish the data as follows.

AgentConfiguration agentConfiguration = new AgentConfiguration();
Agent agent = new Agent(agentConfiguration);
        
DataPublisher dataPublisher = new DataPublisher("tcp://localhost:7611", "admin", "admin", agent);

2. Define the stream as shown below.  

streamId = dataPublisher.defineStream("{" +
                                                  "  'name':'" + PHONE_RETAIL_STREAM + "'," +
                                                  "  'version':'" + VERSION + "'," +
                                                  "  'nickName': 'Phone_Retail_Shop'," +
                                                  "  'description': 'Phone Sales'," +
                                                  "  'metaData':[" +
                                                  "          {'name':'clientType','type':'STRING'}" +
                                                  "  ]," +
                                                  "  'payloadData':[" +
                                                  "          {'name':'brand','type':'STRING'}," +
                                                  "          {'name':'quantity','type':'INT'}," +
                                                  "          {'name':'total','type':'INT'}," +
                                                  "  ]" +
                                                  "}");

3. Publish the data as follows.

Event eventOne = new Event(streamId, System.currentTimeMillis(), new Object[]{"external"}, null,
                                   new Object[]{"IBM", 45, 2500});

dataPublisher.publish(eventOne); 

Follow the steps below to publish the name of the buyer that is not defined in the Stream.

4. Define a Map (To send the key-value pair).

Map arbitraryData = new HashMap<String,String>();
arbitraryData.put("buyer","Joe");

Note the following rules when defining the key:

  1. To define the key-value pair as meta data, follow the syntax arbitraryData.put("meta.buyer","Joe");
  2. To define the key-value pair as correlation data, follow the syntax arbitraryData.put("correlation.buyer","Joe");
  3. To define the key-value pair as payload data, follow the syntax arbitraryData.put("buyer","Joe"); - No prefix.

Other than the above, you can't use dot(.) for key.

5. Publish the map with the event.

Event eventOne = new Event(streamId, System.currentTimeMillis(), new Object[]{"external"}, null,
                                   new Object[]{"IBM", 45, 2500},arbitraryData);

dataPublisher.publish(eventOne);

Info

To receive arbitrary values in the CEP, bucket needs to be configured properly but it does not need any special configuration. We need to follow same approach which we are using for Tuple mapping.

  1. If we are defining a key-value pair as meta data then it needs to be configured as meta data in the input mapping of the bucket.
  2. If we are defining a key-value pair as correlation data then it needs to be configured as correlation data in the input mapping of the bucket.
  3. If we are defining a key-value pair as payload data then it needs to be configured as payload data in the input mapping of the bucket.

Example: If we defining the key-value as meta data then,

 

 

Copyright © WSO2 Inc. 2005-2014