Data Publisher
Custom fields with data stream
Data publishers generally allow you to send data to a predefined set of data fields, to the BAM/CEP server only specified in the stream definitions. This is a set of fixed data fields you send through a data bridge. You can also send custom key-value pairs with data events.
The data bridge data agent has a Map data structure that enables you to send an arbitrary number of String key-value pairs. The other data structures are the three Object Arrays corresponding to the key-value pairs of metadata, correlation data and payload data of fixed Stream definitions. You can change the key-value pairs in the Map data structure from message to message, but they all should be of type String.
You can put the data types of these custom key value pairs into three groups according to the transmission categories.
- When the key starts with meta :Â The data field is considered as a metadatacustom field. It is sent with metadata and saved in Cassandra with key prefix meta_.
- When the key starts with correlation :Â The data field is considered as a correlation data custom field. It is sent with correlation data and saved in Cassandra with key prefix correlation_.
- When the key starts with payload or any other string : The data field is considered as a payload data custom field. It is sent with payload data and saved in Cassandra with key prefix payload_.
Dependencies
In order to publish data to BAM through a custom data agent, you need to add dependencies in one of the following ways:
- Option 1: Add the following JARs to the class path:
- ant-1.7.0.jar
- commons-io-2.0.jar
- infinispan-core-5.1.2.wso2v1.jar
- log4j-1.2.14.jar
- org.wso2.carbon.securevault-4.1.0.jar
- ant-1.7.0.wso2v1.jar
- commons-lang-1.0.jar
- javax.servlet-3.0.0.v201112011016.jar
- marshalling-1.3.6.wso2v1.jar
- org.wso2.carbon.user.api-4.1.0.jar
- ant-launcher-1.7.0.jar
- commons-logging-1.1.1.jar
- jaxen-1.1.1.jar
- not-yet-commons-ssl-0.3.9.jar
- org.wso2.carbon.utils-4.1.0.jar axiom-1.2.11.wso2v4.jar
- commons-pool-1.5.0.wso2v1.jar
- jboss-logging-3.1.0.CR2.jar
- org.apache.log4j-1.2.13.v200706111418.jar
- org.wso2.securevault-1.0.0-wso2v2.jar
- axiom-api-1.2.11.jar
- commons-pool-1.5.jar
- jboss-logging-3.1.0.wso2v1.jar
- org.eclipse.osgi-3.8.1.v20120830-144521.jar
- slf4j-1.5.10.wso2v1.jar axiom-impl-1.2.11.jar
- dom4j-1.6.1.jar
- jboss-marshalling-1.3.6.GA.jar
- org.eclipse.osgi.services-3.3.100.v20120522-1822.jar
- smack-3.0.4.wso2v1.jar axis2-1.6.1.wso2v7.jar
- ehcache-1.5.0.jar
- jboss-marshalling-river-1.3.6.GA.jar
- org.wso2.carbon.base-4.1.0.jar
- smackx-3.0.4.wso2v1.jar backport-util-concurrent-3.1.jar
- ehcache-1.5.0.wso2v1.jar
- jdom-1.0.jar
- org.wso2.carbon.caching.core-4.1.0.jar
- stax2-api-3.1.1.jar commons-cli-1.0.jar
- geronimo-activation_1.1_spec-1.0.2.jar
- jgroups-3.0.6.Final.jar
- org.wso2.carbon.core.common-4.1.0.jar
- woodstox-core-asl-4.1.1.jar commons-codec-1.2.jar
- geronimo-javamail_1.4_spec-1.6.jar
- jgroups-3.0.6.wso2v1.jar
- org.wso2.carbon.databridge.agent.thrift-4.1.2.jar
- wstx-asl-3.2.9.jar commons-collections-3.2.jar
- geronimo-stax-api_1.0_spec-1.0.1.jar
- jline-0.9.94.jar
- org.wso2.carbon.databridge.commons-4.1.2.jar
- xalan-2.6.0.jar commons-fileupload-1.2.0.wso2v1.jar
- gson-2.1.jar
- json-2.0.0.wso2v1.jar
- org.wso2.carbon.databridge.commons.thrift-4.1.0.jar
- xercesImpl-2.6.2.jar commons-fileupload-1.2.jar
- httpclient-4.1.1-wso2v1.jar
- jsr107cache-1.1.0-wso2v2.jar
- org.wso2.carbon.logging-4.1.0.jar
- xml-apis-1.3.02.jar commons-httpclient-3.1.0.wso2v2.jar
- httpcore-4.1.0-wso2v1.jar
- junit-3.8.1.jar
- org.wso2.carbon.queuing-4.1.0.jar
- xmlParserAPIs-2.6.2.jar commons-httpclient-3.1.jar
- icu4j-2.6.1.jar
- libthrift-0.7.wso2v1.jar
- org.wso2.carbon.registry.api-4.1.0.jar
- xom-1.0.jar
- Option 2: Add the following maven dependency to the pom file:Maven repository
<repositories> <repository> <id>org.wso2.carbon</id> <url>http://dist.wso2.org/maven2/</url> </repository> </repositories>
Maven pom dependency<dependency> <groupId>org.wso2.carbon</groupId> <artifactId>org.wso2.carbon.databridge.agent.thrift</artifactId> <version>4.1.2</version> </dependency> <dependency> <groupId>org.wso2.carbon</groupId> <artifactId>org.wso2.carbon.databridge.commons</artifactId> <version>4.1.2</version> </dependency>
Publish and query data with variable keys
This feature is useful if you don't have a predefined format for sending data. You can use this feature to publish additional data, which are not defined in the stream definition.Â
For example, assume you collecting and analyzing data from a call center. This data contains details that are always available like the caller's name, day, time to respond (s), answered and missed calls etc. Since this information is always available, it can be defined in a stream definition as follows.
{ "correlationData" : "null", "description" : "Phone Sales", "metaData" : [ { "name" : "publisherIP", "type" : "STRING" } ], "name" : "org.wso2.carbon.bam.call.center.kpi", "nickName" : "Phone_Retail_Shop", "payloadData" : [ { "name" : "name", "type" : "STRING" }, { "name" : "day", "type" : "STRING" }, { "name" : "timeToRespond", "type" : "INT" }, { "name" : "answered", "type" : "INT" }, { "name" : "abandoned", "type" : "INT" } ], "streamId" : "org.wso2.carbon.bam.call.center.kpi:1.0.0", "tags" : "null", "version" : "1.0.0" }
But, some information like the location of the caller is not always available. This kind of information can be published as additional information in variable keys-value pairs. You can use the data agent API to publish variable keys/values as shown in the following example.
package org.wso2.carbon.bam.sample; import org.apache.log4j.Logger; import org.wso2.carbon.databridge.agent.thrift.Agent; import org.wso2.carbon.databridge.agent.thrift.AsyncDataPublisher; import org.wso2.carbon.databridge.agent.thrift.conf.AgentConfiguration; import org.wso2.carbon.databridge.agent.thrift.exception.AgentException; import org.wso2.carbon.databridge.commons.Event; import java.util.HashMap; public class VariableKeyValueAgent { private static Logger logger = Logger.getLogger(VariableKeyValueAgent.class); public static final String CALL_CENTER_DATA_STREAM = "callcenter_stream"; public static final String VERSION = "1.0.0"; public static final String DATA = "Sophia,Mon,36,1,0,Colombo\n" + "Jacob,Mon,55,0,1,Horana\n" + "Mason,Mon,150,0,1\n" + "William,Mon,10,1,0,Colombo\n" + "Jayden,Mon,15,1,0,Galle\n" + "Michael,Mon,25,1,0\n" + "Emma,Mon,40,1,0,Colombo"; public static void main(String[] args) { AgentConfiguration agentConfiguration = new AgentConfiguration(); System.setProperty("javax.net.ssl.trustStore", "/mnt/windows/Users/chamith/Desktop/share/releases/bam2/230/wso2bam-2.3.0/repository/resources/security/client-truststore.jks"); System.setProperty("javax.net.ssl.trustStorePassword", "wso2carbon"); Agent agent = new Agent(agentConfiguration); //Using Asynchronous data publisher AsyncDataPublisher asyncDataPublisher = new AsyncDataPublisher("tcp://localhost:7611", "admin", "admin", agent); String streamDefinition = "{" + " 'name':'" + CALL_CENTER_DATA_STREAM + "'," + " 'version':'" + VERSION + "'," + " 'nickName': 'Phone_Retail_Shop'," + " 'description': 'Phone Sales'," + " 'metaData':[" + " {'name':'publisherIP','type':'STRING'}" + " ]," + " 'payloadData':[" + " {'name':'name','type':'STRING'}," + " {'name':'day','type':'STRING'}," + " {'name':'timeToRespond','type':'INT'}," + " {'name':'answered','type':'INT'}," + " {'name':'abandoned','type':'INT'}" + " ]" + "}"; asyncDataPublisher.addStreamDefinition(streamDefinition, CALL_CENTER_DATA_STREAM, VERSION); publishEvents(asyncDataPublisher); } private static void publishEvents(AsyncDataPublisher asyncDataPublisher) { String[] dataRow = DATA.split("\n"); for (String row : dataRow) { String[] data = row.split(","); Object[] payload = new Object[]{ data[0], data[1], Integer.parseInt(data[2]), Integer.parseInt(data[3]), Integer.parseInt(data[4]) }; HashMap<String, String> map = new HashMap<String, String>(); //Calling location information included. if (data.length == 6) { map.put("location", data[5]); } Event event = eventObject(null, new Object[]{"10.100.3.173"}, payload, map); try { asyncDataPublisher.publish(CALL_CENTER_DATA_STREAM, VERSION, event); } catch (AgentException e) { logger.error("Failed to publish event", e); } } } private static Event eventObject(Object[] correlationData, Object[] metaData, Object[] payLoadData, HashMap<String, String> map) { Event event = new Event(); event.setCorrelationData(correlationData); event.setMetaData(metaData); event.setPayloadData(payLoadData); event.setArbitraryDataMap(map); return event; } }
After publishing the call center data, you can query it with variable keys using Hive scripts.
For example, assume you want to find the callers who call from the city Colombo. The following example Hive query shows how.
CREATE EXTERNAL TABLE IF NOT EXISTS CustomerServiceDeptStats (key STRING, name STRING,day STRING, timeToRespond INT, answered INT, abandoned INT, value map<string,string>) STORED BY 'org.apache.hadoop.hive.cassandra.CassandraStorageHandler' WITH SERDEPROPERTIES ( "cassandra.host" = "127.0.0.1", "cassandra.port" = "9160","cassandra.ks.name" = "EVENT_KS", "cassandra.ks.username" = "admin","cassandra.ks.password" = "admin", "cassandra.cf.name" = "org_wso2_carbon_bam_call_center_kpi", "cassandra.columns.mapping" = ":key,payload_name,payload_day,payload_timeToRespond,payload_answered,payload_abandoned,map:" ); select name,day,timeToRespond from CustomerServiceDeptStats where value['location'] = 'Colombo';
This is the result of the query:
name | day | time to respond |
William | Mon | 10 |
Sophia | Mon | 36 |
Emma | Mon | 40 |