Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Migrated to Confluence 5.3

 Custom fields with data stream

Data publishers generally allow you to send data to a predefined set of data fields, to the BAM/CEP server only specified in the stream definitions. This is a set of fixed data fields you send through a data bridge. You can also send custom key-value pairs with data events.

...

  1. When the key starts with meta:       The data field is considered as a metadatacustom metadatacustom field. It is sent with metadata and saved in Cassandra with key prefix meta_.
  2. When the key starts with correlation: The data field is considered as a correlation data custom field. It is sent with correlation data and saved in Cassandra with key prefix correlation_.
  3. When the key starts with payload or any other string : The data field is considered as a payload data custom field. It is sent with payload data and saved in Cassandra with key prefix payload_.

...

In order to publish data to BAM through a custom data agent you need to have below dependencies.

Either you can add below jars to your class path 

  • commons-logging-1.1.1.jar
  • commons-pool-1.5.0.wso2v1.jar
  • commons-pool-1.5.jar
  • gson-2.1.jar
  • httpclient-4.1.1-wso2v1.jar
  • json-2.0.0.wso2v1.jar
  • libthrift-0.7.wso2v1.jar
  • org.apache.log4j-1.2.13.v200706111418.jar
  • org.eclipse.osgi-3.8.1.v20120830-144521.jar
  • org.eclipse.osgi.services-3.3.100.v20120522-1822.jar
  • org.wso2.carbon.bootstrap-4.2.0.jar
  • org.wso2.carbon.databridge.agent.thrift_4.2.0.jar
  • org.wso2.carbon.databridge.commons_4.2.0.jar
  • org.wso2.carbon.databridge.commons.thrift_4.2.0.jar
  • org.wso2.carbon.logging-4.2.0.jar
  • slf4j-1.5.10.wso2v1.jar
  • wrapper-3.2.3.jar
or you can add below the maven dependency entry to your pom file.

 

Code Block
languagehtml/xml
titleMaven repository
<repositories>
    <repository>
        <id>org.wso2.carbon</id>
        <url>http://dist.wso2.org/maven2/</url>
    </repository>
</repositories>
Code Block
languagehtml/xml
titleMaven pom dependency
<dependency>
	<groupId>org.wso2.carbon</groupId> 
	<artifactId>org.wso2.carbon.databridge.agent.thrift</artifartifactId> actId>

	<version>4.2.0</version>
</dependency> 
<dependency>
	<groupId>org.wso2.carbon</groupId> 
	<artifactId>org.wso2.carbon.databridge.commons</artifactId> 
	<version>4.2.0</version> 
</dependency>

 

...

Code Block
languagejava
titleSample agent
package org.wso2.carbon.bam.sample;

import org.apache.log4j.Logger;
import org.wso2.carbon.databridge.agent.thrift.Agent;
import org.wso2.carbon.databridge.agent.thrift.AsyncDataPublisher;
import org.wso2.carbon.databridge.agent.thrift.conf.AgentConfiguration;
import org.wso2.carbon.databridge.agent.thrift.exception.AgentException;
import org.wso2.carbon.databridge.commons.Event;
import java.util.HashMap;

public class VariableKeyValueAgent {
    private static Logger logger = Logger.getLogger(VariableKeyValueAgent.class);
    public static final String CALL_CENTER_DATA_STREAM = "callcenter_stream";
    public static final String VERSION = "1.0.0";
    public static final String DATA = "Sophia,Mon,36,1,0,Colombo\n" +
                                      "Jacob,Mon,55,0,1,Horana\n" +
                                      "Mason,Mon,150,0,1\n" +
                                      "William,Mon,10,1,0,Colombo\n" +
                                      "Jayden,Mon,15,1,0,Galle\n" +
                                      "Michael,Mon,25,1,0\n" +
                                      "Emma,Mon,40,1,0,Colombo";
    public static void main(String[] args) {
        AgentConfiguration agentConfiguration = new AgentConfiguration();
        System.setProperty("javax.net.ssl.trustStore", "/mnt/windows/Users/chamith/Desktop/share/releases/bam2/230/wso2bam-2.3.0/repository/resources/security/client-truststore.jks");        
        System.setProperty("javax.net.ssl.trustStorePassword", "wso2carbon");
        Agent agent
= new Agent(agentConfiguration);         //Using Asynchronous data publisher
        AsyncDataPublisher asyncDataPublisher = new AsyncDataPublisher("tcp://localhost:7611", "admin", "admin", agent);
        String streamDefinition = "{" +
                                  " 'name':'" + CALL_CENTER_DATA_STREAM + "'," +
                                  " 'version':'" + VERSION + "'," +
                                  " 'nickName': 'Phone_Retail_Shop'," +
                                  " 'description': 'Phone Sales'," +
                                  " 'metaData':[" +
                                  " {'name':'publisherIP','type':'STRING'}" +
                                  " ]," +
                                  " 'payloadData':[" +
                                  " {'name':'name','type':'STRING'}," +
                                  " {'name':'day','type':'STRING'}," +
                                  " {'name':'timeToRespond','type':'INT'}," +
                                  " {'name':'answered','type':'INT'}," +
                                  " {'name':'abandoned','type':'INT'}" +
                                  " ]" +
                                  "}";
        asyncDataPublisher.addStreamDefinition(streamDefinition, CALL_CENTER_DATA_STREAM, VERSION);
        publishEvents(asyncDataPublisher);
    }
    private static void publishEvents(AsyncDataPublisher asyncDataPublisher) {
        String[] dataRow = DATA.split("\n");
        for (String row : dataRow) {
            String[] data = row.split(",");
            Object[] payload = new Object[]{
                    data[0], data[1], Integer.parseInt(data[2]),
                    Integer.parseInt(data[3]), Integer.parseInt(data[4])
            };
            HashMap<String, String> map = new HashMap<String, String>();
            //Calling location information included.
            if (data.length == 6) {
                map.put("location", data[5]);
            }
            Event event = eventObject(null, new Object[]{"10.100.3.173"}, payload, map);
            try {
                asyncDataPublisher.publish(CALL_CENTER_DATA_STREAM, VERSION, event);
            } catch (AgentException e) {
                logger.error("Failed to publish event", e);
            }
        }
    }
    private static Event eventObject(Object[] correlationData, Object[] metaData,
                                     Object[] payLoadData, HashMap<String, String> map) {
        Event event = new Event();
        event.setCorrelationData(correlationData);
        event.setMetaData(metaData);
        event.setPayloadData(payLoadData);
        event.setArbitraryDataMap(map);
        return event;
    }
}

After publishing the call center data, you can query it with variable keys using Hive scripts.

...

This is the result of the query:

namedaytime to respond
WilliamMon10
SophiaMon36
EmmaMon40