This documentation is for WSO2 CEP 2.1.0. View the home page of the latest release.

Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 31 Next »

This section describes the following main methods available to send WSO2 events to Agent broker via Custom Data Publishers.

In both methods above, we can publish data in 3 ways as follows.

  • Data Publisher
  • AsyncDataPublisher
  • Load Balancing DataPublisher

Infor

For more information on data publishers, refer to this article in WSO2 library: http://wso2.org/library/articles/2012/07/creating-custom-agents-publish-events-bamcep.

When publishing data using any of the three methods mentioned above, you should define the stream definition to send data before sending events. 

Supporting Arbitrary Key-Value Pairs in WSO2 Events 

Although in previous CEP release versions, all values sent in the event must match the stream definition, CEP version 2.1.0 and after have support to send arbitrary key-value pairs, which are not defined in the stream. This can be done using the default Agent broker in CEP.

Send the arbitrary value in Map data format and the key-value pairs in String format. 

1. Create the agent and data publisher to publish the data as follows.

AgentConfiguration agentConfiguration = new AgentConfiguration();
Agent agent = new Agent(agentConfiguration);
        
DataPublisher dataPublisher = new DataPublisher("tcp://localhost:7611", "admin", "admin", agent);

2. Define the stream as shown below.  

streamId = dataPublisher.defineStream("{" +
                                                  "  'name':'" + PHONE_RETAIL_STREAM + "'," +
                                                  "  'version':'" + VERSION + "'," +
                                                  "  'nickName': 'Phone_Retail_Shop'," +
                                                  "  'description': 'Phone Sales'," +
                                                  "  'metaData':[" +
                                                  "          {'name':'clientType','type':'STRING'}" +
                                                  "  ]," +
                                                  "  'payloadData':[" +
                                                  "          {'name':'brand','type':'STRING'}," +
                                                  "          {'name':'quantity','type':'INT'}," +
                                                  "          {'name':'total','type':'INT'}," +
                                                  "  ]" +
                                                  "}");

3. Publish the data as follows.

Event eventOne = new Event(streamId, System.currentTimeMillis(), new Object[]{"external"}, null,
                                   new Object[]{"IBM", 45, 2500});

dataPublisher.publish(eventOne); 

Follow the steps below to publish the name of the buyer that is not defined in the Stream.

4. Define a Map (To send the key-value pair).

Map arbitraryData = new HashMap<String,String>();
arbitraryData.put("buyer","Joe");

Note the following rules when defining the key:

  1. To define the key-value pair asmeta data, follow the syntax arbitraryData.put("meta.buyer","Joe");
  2. To define the key-value pair as correlation data, follow the syntax arbitraryData.put("correlation.buyer","Joe");
  3. To define the key-value pair as payload data, follow the syntax arbitraryData.put("buyer","Joe"); - No prefix.

Other than the above, you can't use dot(.) for key.

5. Publish the map with the event.

Event eventOne = new Event(streamId, System.currentTimeMillis(), new Object[]{"external"}, null,
                                   new Object[]{"IBM", 45, 2500},arbitraryData);

dataPublisher.publish(eventOne);
  • No labels