Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleSample agent
package org.wso2.carbon.bam.sample;

import org.apache.log4j.Logger;
import org.wso2.carbon.databridge.agent.thrift.Agent;
import org.wso2.carbon.databridge.agent.thrift.AsyncDataPublisher;
import org.wso2.carbon.databridge.agent.thrift.conf.AgentConfiguration;
import org.wso2.carbon.databridge.agent.thrift.exception.AgentException;
import org.wso2.carbon.databridge.commons.Event;
import java.util.HashMap;

public class VariableKeyValueAgent {
    private static Logger logger = Logger.getLogger(VariableKeyValueAgent.class);
    public static final String CALL_CENTER_DATA_STREAM = "callcenter_stream";
    public static final String VERSION = "1.0.0";
    public static final String DATA = "Sophia,Mon,36,1,0,Colombo\n" +
                                      "Jacob,Mon,55,0,1,Horana\n" +
                                      "Mason,Mon,150,0,1\n" +
                                      "William,Mon,10,1,0,Colombo\n" +
                                      "Jayden,Mon,15,1,0,Galle\n" +
                                      "Michael,Mon,25,1,0\n" +
                                      "Emma,Mon,40,1,0,Colombo";
    public static void main(String[] args) {
        AgentConfiguration agentConfiguration = new AgentConfiguration();
        System.setProperty("javax.net.ssl.trustStore", "/mnt/windows/Users/chamith/Desktop/share/releases/bam2/230/wso2bam-2.3.0/repository/resources/security/client-truststore.jks");        
        System.setProperty("javax.net.ssl.trustStorePassword", "wso2carbon");
        Agent agent = new Agent(agentConfiguration);
        //Using Asynchronous data publisher
        AsyncDataPublisher asyncDataPublisher = new AsyncDataPublisher("tcp://localhost:7611", "admin", "admin", agent);
        String streamDefinition = "{" +
                                  " 'name':'" + CALL_CENTER_DATA_STREAM + "'," +
                                  " 'version':'" + VERSION + "'," +
                                  " 'nickName': 'Phone_Retail_Shop'," +
                                  " 'description': 'Phone Sales'," +
                                  " 'metaData':[" +
                                  " {'name':'publisherIP','type':'STRING'}" +
                                  " ]," +
                                  " 'payloadData':[" +
                                  " {'name':'name','type':'STRING'}," +
                                  " {'name':'day','type':'STRING'}," +
                                  " {'name':'timeToRespond','type':'INT'}," +
                                  " {'name':'answered','type':'INT'}," +
                                  " {'name':'abandoned','type':'INT'}" +
                                  " ]" +
                                  "}";
        asyncDataPublisher.addStreamDefinition(streamDefinition, CALL_CENTER_DATA_STREAM, VERSION);
        publishEvents(asyncDataPublisher);
    }
    private static void publishEvents(AsyncDataPublisher asyncDataPublisher) {
        String[] dataRow = DATA.split("\n");
        for (String row : dataRow) {
            String[] data = row.split(",");
            Object[] payload = new Object[]{
                    data[0], data[1], Integer.parseInt(data[2]),
                    Integer.parseInt(data[3]), Integer.parseInt(data[4])
            };
            HashMap<String, String> map = new HashMap<String, String>();
            //Calling location information included.
            if (data.length == 6) {
                map.put("location", data[5]);
            }
            Event event = eventObject(null, new Object[]{"10.100.3.173"}, payload, map);
            try {
                asyncDataPublisher.publish(CALL_CENTER_DATA_STREAM, VERSION, event);
            } catch (AgentException e) {
                logger.error("Failed to publish event", e);
            }
        }
    }
    private static Event eventObject(Object[] correlationData, Object[] metaData,
                                     Object[] payLoadData, HashMap<String, String> map) {
        Event event = new Event();
        event.setCorrelationData(correlationData);
        event.setMetaData(metaData);
        event.setPayloadData(payLoadData);
        event.setArbitraryDataMap(map);
        return event;
    }
}

...