Here we are going to discuss about how we can send WSO2 events to Agent Broker. There are two main methods available to do this.
- Java API (See Example)
- REST API (See Example)
In both above mentioned methods, we can publish data in 3 ways. They are
- Data Publisher
- AsyncDataPublisher
- Load Balancing DataPublisher
Following blog post and article will provide more information regarding the above mentioned ways to publish data.
http://sinthu-rajan.blogspot.com/2012/12/non-blocking-data-publshing-for-bamcep.html
http://wso2.org/library/articles/2012/07/creating-custom-agents-publish-events-bamcep
When publishing data in above mentioned ways, Before sending events we need to define the stream definition to send data.
All the values that are send in the event must match with the stream definition (Previous CEP versions) but our CEP version 2.1.0 supports to send arbitrary key-value pairs which are not defined in the stream.
Below we have discussed further regarding that.
Supporting arbitrary key-value pairs in WSO2 events
Our previous CEP versions are not support to send arbitrary key-value values which are not defined in the stream.
But WSO2 CEP 2.1.0 version supports to send arbitrary key-value pairs. This only can be done using the agent broker provided in CEP by default.
When sending the arbitrary value, it needs to be send as Map data format and both key and value pair needs to be in String format.
1. Creating the agent and datapublisher to publish the data
AgentConfiguration agentConfiguration = new AgentConfiguration(); Agent agent = new Agent(agentConfiguration); DataPublisher dataPublisher = new DataPublisher("tcp://localhost:7611", "admin", "admin", agent);
2. Defining the stream as shown below.
streamId = dataPublisher.defineStream("{" + " 'name':'" + PHONE_RETAIL_STREAM + "'," + " 'version':'" + VERSION + "'," + " 'nickName': 'Phone_Retail_Shop'," + " 'description': 'Phone Sales'," + " 'metaData':[" + " {'name':'clientType','type':'STRING'}" + " ]," + " 'payloadData':[" + " {'name':'brand','type':'STRING'}," + " {'name':'quantity','type':'INT'}," + " {'name':'total','type':'INT'}," + " ]" + "}");
3. Publishing the data
Event eventOne = new Event(streamId, System.currentTimeMillis(), new Object[]{"external"}, null, new Object[]{"IBM", 45, 2500}); dataPublisher.publish(eventOne);
Here if we want to publish the name of the buyer which is not defined in the Stream, then we need do the below steps
4. Defining a Map (To send key-value pair)
Map arbitraryData = new HashMap<String,String>(); arbitraryData.put("buyer","Joe");
Here when defining the key, we need to follow some rules
- If you want to define the key-value pair as a meta data, then follow the syntax eg : arbitraryData.put("meta.buyer","Joe");
- If you want to define the key-value pair as correlation data, then follow the syntax eg : arbitraryData.put("correlation.buyer","Joe");
- If you want to define the key-value pair as payload data, then follow the syntax eg : arbitraryData.put("buyer","Joe"); - No any prefix
Other than the above mention situations, you can't use dot(.) for key.
5. Publish the map with the event.
Event eventOne = new Event(streamId, System.currentTimeMillis(), new Object[]{"external"}, null, new Object[]{"IBM", 45, 2500},arbitraryData); dataPublisher.publish(eventOne);