Unknown macro: {next_previous_link3}
Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 7 Next »

 Custom fields with data stream

Data publishers generally allow you to send data to a predefined set of data fields, to the BAM/CEP server only specified in the stream definitions. This is a set of fixed data fields you send through a data bridge. You can also send custom key-value pairs with data events.

The data bridge data agent has a Map data structure that enables you to send an arbitrary number of String key-value pairs. The other data structures are the three Object Arrays corresponding to the key-value pairs of metadata, correlation data and payload data of fixed Stream definitions. You can change the key-value pairs in the Map data structure from message to message, but they all should be of type String.

You can put the data types of these custom key value pairs into three groups according to the transmission categories.

  1. When the key starts with meta : The data field is considered as a metadatacustom field. It is sent with metadata and saved in Cassandra with key prefix meta_.
  2. When the key starts with correlation : The data field is considered as a correlation data custom field. It is sent with correlation data and saved in Cassandra with key prefix correlation_.
  3. When the key starts with payload or any other string : The data field is considered as a payload data custom field. It is sent with payload data and saved in Cassandra with key prefix payload_.

Dependencies

In order to publish data to BAM through custom data agent you need to have below dependencies.

Either you can add below jars to your class path 

  • commons-logging-1.1.1.jar
  • commons-pool-1.5.0.wso2v1.jar
  • commons-pool-1.5.jar
  • gson-2.1.jar
  • httpclient-4.1.1-wso2v1.jar
  • json-2.0.0.wso2v1.jar
  • libthrift-0.7.wso2v1.jar
  • org.apache.log4j-1.2.13.v200706111418.jar
  • org.eclipse.osgi-3.8.1.v20120830-144521.jar
  • org.eclipse.osgi.services-3.3.100.v20120522-1822.jar
  • org.wso2.carbon.bootstrap-4.2.0.jar
  • org.wso2.carbon.databridge.agent.thrift_4.2.0.jar
  • org.wso2.carbon.databridge.commons_4.2.0.jar
  • org.wso2.carbon.databridge.commons.thrift_4.2.0.jar
  • org.wso2.carbon.logging-4.2.0.jar
  • slf4j-1.5.10.wso2v1.jar
  • wrapper-3.2.3.jar
or you can add below the maven dependency entry to your pom file.

 

Maven repository
<repositories>
    <repository>
        <id>org.wso2.carbon</id>
        <url>http://dist.wso2.org/maven2/</url>
    </repository>
</repositories>
Maven pom dependency
<dependency>
	<groupId>org.wso2.carbon</groupId> 
	<artifactId>org.wso2.carbon.databridge.agent.thrift</artifactId> 
	<version>4.2.0</version>
</dependency> 
<dependency>
	<groupId>org.wso2.carbon</groupId> 
	<artifactId>org.wso2.carbon.databridge.commons</artifactId> 
	<version>4.2.0</version> 
</dependency>

 

 

Publish and query data with variable keys

This feature is useful if you don't have a predefined format for sending data. You can use this feature to publish additional data, which are not defined in the stream definition. 

For example, assume you collecting and analyzing data from a call center. This data contains details that are always available like the caller's name, day, time to respond (s), answered and missed calls etc. Since this information is always available, it can be defined in a stream definition as follows.

Stream Definition
{ "correlationData" : "null",
  "description" : "Phone Sales",
  "metaData" : [ { "name" : "publisherIP",
        "type" : "STRING"
      } ],
  "name" : "org.wso2.carbon.bam.call.center.kpi",
  "nickName" : "Phone_Retail_Shop",
  "payloadData" : [ { "name" : "name",
        "type" : "STRING"
      },
      { "name" : "day",
        "type" : "STRING"
      },
      { "name" : "timeToRespond",
        "type" : "INT"
      },
      { "name" : "answered",
        "type" : "INT"
      },
      { "name" : "abandoned",
        "type" : "INT"
      }
    ],
  "streamId" : "org.wso2.carbon.bam.call.center.kpi:1.0.0",
  "tags" : "null",
  "version" : "1.0.0"
}

But, some information like the location of the caller is not always available. This kind of information can be published as additional information in variable keys-value pairs. You can use the data agent API to publish variable keys/values as shown in the following example.

Sample agent
package org.wso2.carbon.bam.sample;

import org.apache.log4j.Logger;
import org.wso2.carbon.databridge.agent.thrift.Agent;
import org.wso2.carbon.databridge.agent.thrift.AsyncDataPublisher;
import org.wso2.carbon.databridge.agent.thrift.conf.AgentConfiguration;
import org.wso2.carbon.databridge.agent.thrift.exception.AgentException;
import org.wso2.carbon.databridge.commons.Event;
import java.util.HashMap;

public class VariableKeyValueAgent {
    private static Logger logger = Logger.getLogger(VariableKeyValueAgent.class);
    public static final String CALL_CENTER_DATA_STREAM = "callcenter_stream";
    public static final String VERSION = "1.0.0";
    public static final String DATA = "Sophia,Mon,36,1,0,Colombo\n" +
                                      "Jacob,Mon,55,0,1,Horana\n" +
                                      "Mason,Mon,150,0,1\n" +
                                      "William,Mon,10,1,0,Colombo\n" +
                                      "Jayden,Mon,15,1,0,Galle\n" +
                                      "Michael,Mon,25,1,0\n" +
                                      "Emma,Mon,40,1,0,Colombo";
    public static void main(String[] args) {
        AgentConfiguration agentConfiguration = new AgentConfiguration();
        System.setProperty("javax.net.ssl.trustStore", "/mnt/windows/Users/chamith/Desktop/share/releases/bam2/230/wso2bam-2.3.0/repository/resources/security/client-truststore.jks");
        System.setProperty("javax.net.ssl.trustStorePassword", "wso2carbon");
        Agent agent = new Agent(agentConfiguration);
        //Using Asynchronous data publisher
        AsyncDataPublisher asyncDataPublisher = new AsyncDataPublisher("tcp://localhost:7611", "admin", "admin", agent);
        String streamDefinition = "{" +
                                  " 'name':'" + CALL_CENTER_DATA_STREAM + "'," +
                                  " 'version':'" + VERSION + "'," +
                                  " 'nickName': 'Phone_Retail_Shop'," +
                                  " 'description': 'Phone Sales'," +
                                  " 'metaData':[" +
                                  " {'name':'publisherIP','type':'STRING'}" +
                                  " ]," +
                                  " 'payloadData':[" +
                                  " {'name':'name','type':'STRING'}," +
                                  " {'name':'day','type':'STRING'}," +
                                  " {'name':'timeToRespond','type':'INT'}," +
                                  " {'name':'answered','type':'INT'}," +
                                  " {'name':'abandoned','type':'INT'}" +
                                  " ]" +
                                  "}";
        asyncDataPublisher.addStreamDefinition(streamDefinition, CALL_CENTER_DATA_STREAM, VERSION);
        publishEvents(asyncDataPublisher);
    }
    private static void publishEvents(AsyncDataPublisher asyncDataPublisher) {
        String[] dataRow = DATA.split("\n");
        for (String row : dataRow) {
            String[] data = row.split(",");
            Object[] payload = new Object[]{
                    data[0], data[1], Integer.parseInt(data[2]),
                    Integer.parseInt(data[3]), Integer.parseInt(data[4])
            };
            HashMap<String, String> map = new HashMap<String, String>();
            //Calling location information included.
            if (data.length == 6) {
                map.put("location", data[5]);
            }
            Event event = eventObject(null, new Object[]{"10.100.3.173"}, payload, map);
            try {
                asyncDataPublisher.publish(CALL_CENTER_DATA_STREAM, VERSION, event);
            } catch (AgentException e) {
                logger.error("Failed to publish event", e);
            }
        }
    }
    private static Event eventObject(Object[] correlationData, Object[] metaData,
                                     Object[] payLoadData, HashMap<String, String> map) {
        Event event = new Event();
        event.setCorrelationData(correlationData);
        event.setMetaData(metaData);
        event.setPayloadData(payLoadData);
        event.setArbitraryDataMap(map);
        return event;
    }
}

After publishing the call center data, you can query it with variable keys using Hive scripts.

For example, assume you want to find the callers who call from the city Colombo. The following example Hive query shows how.

Hive Query to find the caller location
CREATE EXTERNAL TABLE IF NOT EXISTS CustomerServiceDeptStats (key STRING, name STRING,day STRING,
timeToRespond INT, answered INT, abandoned INT, value map<string,string>) STORED BY 
'org.apache.hadoop.hive.cassandra.CassandraStorageHandler' WITH SERDEPROPERTIES ( "cassandra.host" = "127.0.0.1",
"cassandra.port" = "9160","cassandra.ks.name" = "EVENT_KS",
"cassandra.ks.username" = "admin","cassandra.ks.password" = "admin",
"cassandra.cf.name" = "org_wso2_carbon_bam_call_center_kpi",
"cassandra.columns.mapping" = ":key,payload_name,payload_day,payload_timeToRespond,payload_answered,payload_abandoned,map:" );

select name,day,timeToRespond from CustomerServiceDeptStats where value['location'] = 'Colombo';

This is the result of the query:

namedaytime to respond
WilliamMon10
SophiaMon36
EmmaMon40
  • No labels