Publishing Binary/Thrift Data using Java Client
Introduction to data publisher
A data publisher allows you to send data to a predefined set of data fields in a DAS/CEP server. The data structure with predefined fields is defined in an event stream. The data is converted to the format defined by the event stream and sent via the WSO2 data-bridge component. You can also send custom key-value pairs with data events.
Custom fields with data stream
The data bridge data agent has a map data structure that enables you to send an arbitrary number of string key-value pairs. The other data structures are the three object arrays corresponding to the key-value pairs of metadata, correlation data, and payload data of fixed stream definitions. You can change the key-value pairs in the map data structure from message to message, but they all should be of the string
data type.
You can put the data types of these custom key-value pairs into three groups according to the transmission category.
- When the key starts with
meta
: The data field is considered as a metadata custom field. It is sent with metadata and referred asmeta_
key prefix. - When the key starts with
correlation
: The data field is considered as a correlation data custom field. It is sent with correlation data and referred ascorrelation_
key prefix. - When the key starts with
payload
or any other string: The data field is considered as a payload data custom field. It is sent with payload data and saved in referred as key prefix.
Dependencies
In order to publish data to WSO2 DAS/CEP through a custom data agent, you need to have the following dependencies. You can configure the dependencies either using the class path or using the POM file.
Adding dependencies using class path
Add the JAR files listed below to your class path. Note that ${carbon.analytics-commons.version}
refers to the version of the carbon analytics commons github repository - https://github.com/wso2/carbon-analytics-common/. It is always recommended to use the jar file from the latest released version.
org.wso2.carbon.logging_4.3.0.jar
commons-pool-1.5.6.wso2v1.jar
google-collect_1.0.0.wso2v2.jar
org.wso2.carbon.utils_4.3.0.jar
org.wso2.carbon.base_4.3.0.jar
axiom_1.2.11.wso2v5.jar
httpclient-4.2.5.wso2v1.jar
libthrift-0.7.0.wso2v2.jar
slf4j.log4j12-1.6.1.jar
slf4j.api-1.6.1.jar
org.wso2.carbon.databridge.agent-${carbon.commons.version}.jar
org.wso2.carbon.databridge.commons.${carbon.commons.version}.jar
org.wso2.carbon.databridge.commons-${carbon.commons.version}.jar
disruptor-2.10.4.wso2v2.jar
Adding dependencies using POM file
${carbon.analytics-common.version}
refers to the version of the carbon-analytics-common github repository - https://github.com/wso2/carbon-analytics-common/. It is always recommended to use the dependency entry from the latest released version.
<repositories> <repository> <id>wso2-nexus</id> <name>WSO2 internal Repository</name> <url>http://maven.wso2.org/nexus/content/groups/wso2-public/</url> <releases> <enabled>true</enabled> <updatePolicy>daily</updatePolicy> <checksumPolicy>ignore</checksumPolicy> </releases> </repository> <repository> <id>wso2.releases</id> <name>WSO2 internal Repository</name> <url>http://maven.wso2.org/nexus/content/repositories/releases/</url> <releases> <enabled>true</enabled> <updatePolicy>daily</updatePolicy> <checksumPolicy>ignore</checksumPolicy> </releases> </repository> <repository> <id>wso2.snapshots</id> <name>Apache Snapshot Repository</name> <url>http://maven.wso2.org/nexus/content/repositories/snapshots/</url> <snapshots> <enabled>true</enabled> <updatePolicy>daily</updatePolicy> </snapshots> <releases> <enabled>false</enabled> </releases> </repository> </repositories>
<dependency> <groupId>org.wso2.carbon.analytics-common</groupId> <artifactId>org.wso2.carbon.databridge.agent</artifactId> <version>${carbon.analytics-common.version}</version> </dependency> <dependency> <groupId>org.wso2.carbon.analytics-common</groupId> <artifactId>org.wso2.carbon.databridge.commons</artifactId> <version>${carbon.analytics-common.version}</version> </dependency> <dependency> <groupId>org.wso2.carbon.analytics-common</groupId> <artifactId>org.wso2.carbon.databridge.commons.thrift</artifactId> <version>${carbon.analytics-common.version}</version> </dependency> <dependency> <groupId>org.wso2.carbon.analytics-common</groupId> <artifactId>org.wso2.carbon.databridge.commons.binary</artifactId> <version>${carbon.analytics-common.version}</version> </dependency>
Configuring the data agent
A data agent is a single controller for all types of data publishers created. Data publishers share resources such as client pool etc. with one data agent. Thrift data agent is available by default. You can also extend and write a new data agent such as a binary data agent.
Follow the steps below to configure a data agent.
Load the following sample configurations and properties to define the data agent in the JVM.
<DataAgentsConfiguration> <Agent> <Name>Thrift</Name> <DataEndpointClass>org.wso2.carbon.databridge.agent.internal.endpoint.thrift.ThriftDataEndpoint</DataEndpointClass> <TrustSore>src/main/resources/client-truststore.jks</TrustSore> <TrustSorePassword>wso2carbon</TrustSorePassword> <QueueSize>32768</QueueSize> <BatchSize>200</BatchSize> <CorePoolSize>5</CorePoolSize> <MaxPoolSize>10</MaxPoolSize> <KeepAliveTimeInPool>20</KeepAliveTimeInPool> <ReconnectionInterval>30</ReconnectionInterval> <MaxTransportPoolSize>250</MaxTransportPoolSize> <MaxIdleConnections>250</MaxIdleConnections> <EvictionTimePeriod>5500</EvictionTimePeriod> <MinIdleTimeInPool>5000</MinIdleTimeInPool> <SecureMaxTransportPoolSize>250</SecureMaxTransportPoolSize> <SecureMaxIdleConnections>250</SecureMaxIdleConnections> <SecureEvictionTimePeriod>5500</SecureEvictionTimePeriod> <SecureMinIdleTimeInPool>5000</SecureMinIdleTimeInPool> </Agent> <Agent> <Name>Binary</Name> <DataEndpointClass>org.wso2.carbon.databridge.agent.internal.endpoint.binary.BinaryDataEndpoint </DataEndpointClass> <TrustSore>src/main/resources/client-truststore.jks</TrustSore> <TrustSorePassword>wso2carbon</TrustSorePassword> <QueueSize>32768</QueueSize> <BatchSize>200</BatchSize> <CorePoolSize>5</CorePoolSize> <MaxPoolSize>10</MaxPoolSize> <KeepAliveTimeInPool>20</KeepAliveTimeInPool> <ReconnectionInterval>30</ReconnectionInterval> <MaxTransportPoolSize>250</MaxTransportPoolSize> <MaxIdleConnections>250</MaxIdleConnections> <EvictionTimePeriod>5500</EvictionTimePeriod> <MinIdleTimeInPool>5000</MinIdleTimeInPool> <SecureMaxTransportPoolSize>250</SecureMaxTransportPoolSize> <SecureMaxIdleConnections>250</SecureMaxIdleConnections> <SecureEvictionTimePeriod>5500</SecureEvictionTimePeriod> <SecureMinIdleTimeInPool>5000</SecureMinIdleTimeInPool> </Agent> </DataAgentsConfiguration>
To configure the above parameters in the
<CEP_HOME>/repository/conf/data-bridge/data-agent-conf.xml
file in order to tune performance, follow the instructions in WSO2 event configuration.- Instantiate the data publisher as follows:
AgentHolder. setConfigPath(“/path/to/data/agent/conf.xml”)
Set trustStore
File securityFile = new File(<CEP_HOME> + "repository" + File.separator + "resources" + File.separator +"security"); String trustStore = securityFile.getAbsolutePath(); System.setProperty("javax.net.ssl.trustStore", trustStore + "" + File.separator + "client-truststore.jks"); System.setProperty("javax.net.ssl.trustStorePassword", "wso2carbon");
The key also the can be found at https://github.com/wso2/carbon-kernel/tree/master/distribution/kernel/carbon-home/repository/resources/security
Instantiate and use the data publisher using one of the following configurations:
DataPublisher dataPublisher = new DataPublisher(url, username, password);
DataPublisher dataPublisher = new DataPublisher(receiverURLSet, username, password);
DataPublisher dataPublisher = new DataPublisher(receiverURLSet,authURLSet, username, password);
For information on the
receiverURLSet
andauthURLSet
parameters of the above configuration, see Setting up Multi Receiver and Load Balancing Data Agent. And similarly if you are passing an receiverURLSet as tcp://localhost:7611|tcp://localhost:7612|tcp://localhost:7613, then the corresponding authentication URL set will be ssl://localhost:7711|ssl://localhost:7712|ssl://localhost:7713.In all the above methods, the default data agent (which is configured as first Agent element in the above configuration) will be used to create the data publishers. If you have configured only the Thrift data agent in the
<CEP_HOME>/repository/conf/data-bridge/data-agent-conf.xml
file, then this will provide you a Thirft-based data publisher instance.However, if you have configured more types of data agents in the
<CEP_HOME>/repository/conf/data-bridge/data-agent-conf.xml
file (Eg: Binary Agent in the above sample data-agent-conf.xml ), then you can pass an additional property namedtype
, which denotes the type of data publisher that needs to be created. For example, if you have a binary data publisher, then you can passbinary
as the type to get the binary data publisher Instance as shown below.DataPublisher dataPublisher = new DataPublisher(String type, String receiverURLSet, String authURLSet, String username, String password);
Data publisher sample
As a prerequisite for this sample, you need to define the event streams and a WSO2Event receiver in the server (WSO2 DAS/CEP).
Follow the procedure below to use the data publisher.
Initialize the data publisher as follows.
AgentHolder. setConfigPath ( getDataAgentConfigPath ()); DataPublisher dataPublisher = new DataPublisher(url, username, password);
Generate the stream ID for the stream from which you are going to publish the event as follows.
String streamId = DataBridgeCommonsUtils.generateStreamId(HTTPD_LOG_STREAM, VERSION);
- Publish the events using any of the following methods.
In the following configuration, the published event is blocked being called until the event is put into a disruptor. If the disruptor is full it will wait until there is a free space.
Event event = new Event(streamId, System.currentTimeMillis(), new Object[]{"external"}, null, new Object[]{aLog}); dataPublisher.publish(event);
Try publish as shown in the following configuration, is a non-blocking publishing. If there is a space available in the disruptor, it will try to insert the event. However, if the disruptor is full, the event is returned back immediately without waiting.
Event event = new Event(streamId, System.currentTimeMillis(), new Object[]{"external"}, null, new Object[]{aLog}); dataPublisher.tryPublish(event);
Try publish as shown in the following configuration, is a non-blocking publishing with timeout in mili seconds. if there is a space available in the disruptor it will try to insert the event, but if the disruptor is full it will wait for the specified amount of time, and if the timeout is reached the event is returned back.
Event event = new Event(streamId, System.currentTimeMillis(), new Object[]{"external"}, null, new Object[]{aLog}); dataPublisher.tryPublish(event, 100);
For more information on the usage of data publishers, see the sample in the
<CEP_HOME>/samples/cep/producers/wso2-event/
directory.