Custom fields with data stream
Data publishers generally allow you to send data to a predefined set of data fields, to the BAM/CEP server only specified in the stream definitions. This is a set of fixed data fields you send through a data bridge. You can also send custom key-value pairs with data events.
...
- When the key starts with meta: The data field is considered as a metadatacustom metadatacustom field. It is sent with metadata and saved in Cassandra with key prefix meta_.
- When the key starts with correlation: The data field is considered as a correlation data custom field. It is sent with correlation data and saved in Cassandra with key prefix correlation_.
- When the key starts with payload or any other string : The data field is considered as a payload data custom field. It is sent with payload data and saved in Cassandra with key prefix payload_.
...
In order to publish data to BAM through a custom data agent you need to have below dependencies.
Either you can add below jars to your class path
- commons-logging-1.1.1.jar
- commons-pool-1.5.0.wso2v1.jar
- commons-pool-1.5.jar
- gson-2.1.jar
- httpclient-4.1.1-wso2v1.jar
- json-2.0.0.wso2v1.jar
- libthrift-0.7.wso2v1.jar
- org.apache.log4j-1.2.13.v200706111418.jar
- org.eclipse.osgi-3.8.1.v20120830-144521.jar
- org.eclipse.osgi.services-3.3.100.v20120522-1822.jar
- org.wso2.carbon.bootstrap-4.2.0.jar
- org.wso2.carbon.databridge.agent.thrift_4.2.0.jar
- org.wso2.carbon.databridge.commons_4.2.0.jar
- org.wso2.carbon.databridge.commons.thrift_4.2.0.jar
- org.wso2.carbon.logging-4.2.0.jar
- slf4j-1.5.10.wso2v1.jar
- wrapper-3.2.3.jar
or you can add below the maven dependency entry to your pom file pom file.
Code Block | ||||
---|---|---|---|---|
| ||||
<repositories>
<repository>
<id>org.wso2.carbon</id> <url>http://dist.wso2.org/maven2/</url>
</repository>
</repositories> |
Code Block | ||||
---|---|---|---|---|
| ||||
<dependency> <groupId>org.wso2.carbon</groupId> <artifactId>org.wso2.carbon.databridge.agent.thrift</artifartifactId> actId> <version>4.2.0</version> </dependency> <dependency> <groupId>org.wso2.carbon</groupId> <artifactId>org.wso2.carbon.databridge.commons</artifactId> <version>4.2.0</version> </dependency> |
...
Code Block | ||||
---|---|---|---|---|
| ||||
package org.wso2.carbon.bam.sample; import org.apache.log4j.Logger; import org.wso2.carbon.databridge.agent.thrift.Agent; import org.wso2.carbon.databridge.agent.thrift.AsyncDataPublisher; import org.wso2.carbon.databridge.agent.thrift.conf.AgentConfiguration; import org.wso2.carbon.databridge.agent.thrift.exception.AgentException; import org.wso2.carbon.databridge.commons.Event; import java.util.HashMap; public class VariableKeyValueAgent { private static Logger logger = Logger.getLogger(VariableKeyValueAgent.class); public static final String CALL_CENTER_DATA_STREAM = "callcenter_stream"; public static final String VERSION = "1.0.0"; public static final String DATA = "Sophia,Mon,36,1,0,Colombo\n" + "Jacob,Mon,55,0,1,Horana\n" + "Mason,Mon,150,0,1\n" + "William,Mon,10,1,0,Colombo\n" + "Jayden,Mon,15,1,0,Galle\n" + "Michael,Mon,25,1,0\n" + "Emma,Mon,40,1,0,Colombo"; public static void main(String[] args) { AgentConfiguration agentConfiguration = new AgentConfiguration(); System.setProperty("javax.net.ssl.trustStore", "/mnt/windows/Users/chamith/Desktop/share/releases/bam2/230/wso2bam-2.3.0/repository/resources/security/client-truststore.jks"); System.setProperty("javax.net.ssl.trustStorePassword", "wso2carbon"); Agent agent = new Agent(agentConfiguration); //Using Asynchronous data publisher AsyncDataPublisher asyncDataPublisher = new AsyncDataPublisher("tcp://localhost:7611", "admin", "admin", agent); String streamDefinition = "{" + " 'name':'" + CALL_CENTER_DATA_STREAM + "'," + " 'version':'" + VERSION + "'," + " 'nickName': 'Phone_Retail_Shop'," + " 'description': 'Phone Sales'," + " 'metaData':[" + " {'name':'publisherIP','type':'STRING'}" + " ]," + " 'payloadData':[" + " {'name':'name','type':'STRING'}," + " {'name':'day','type':'STRING'}," + " {'name':'timeToRespond','type':'INT'}," + " {'name':'answered','type':'INT'}," + " {'name':'abandoned','type':'INT'}" + " ]" + "}"; asyncDataPublisher.addStreamDefinition(streamDefinition, CALL_CENTER_DATA_STREAM, VERSION); publishEvents(asyncDataPublisher); } private static void publishEvents(AsyncDataPublisher asyncDataPublisher) { String[] dataRow = DATA.split("\n"); for (String row : dataRow) { String[] data = row.split(","); Object[] payload = new Object[]{ data[0], data[1], Integer.parseInt(data[2]), Integer.parseInt(data[3]), Integer.parseInt(data[4]) }; HashMap<String, String> map = new HashMap<String, String>(); //Calling location information included. if (data.length == 6) { map.put("location", data[5]); } Event event = eventObject(null, new Object[]{"10.100.3.173"}, payload, map); try { asyncDataPublisher.publish(CALL_CENTER_DATA_STREAM, VERSION, event); } catch (AgentException e) { logger.error("Failed to publish event", e); } } } private static Event eventObject(Object[] correlationData, Object[] metaData, Object[] payLoadData, HashMap<String, String> map) { Event event = new Event(); event.setCorrelationData(correlationData); event.setMetaData(metaData); event.setPayloadData(payLoadData); event.setArbitraryDataMap(map); return event; } } |
After publishing the call center data, you can query it with variable keys using Hive scripts.
...
This is the result of the query:
name | day | time to respond |
William | Mon | 10 |
Sophia | Mon | 36 |
Emma | Mon | 40 |