Introduction to data publisher
A data publisher allows you to send data to a predefined set of data fields in a DAS/CEP server. The data structure with predefined fields is defined in an /wiki/spaces/TESB/pages/32604253. The data is converted to the format defined by the /wiki/spaces/TESB/pages/32604253 and sent via the WSO2 data-bridge component. You can also send custom key-value pairs with data events.
Custom fields with data stream
The data bridge data agent has a map data structure that enables you to send an arbitrary number of string key-value pairs. The other data structures are the three object arrays corresponding to the key-value pairs of metadata, correlation data, and payload data of fixed stream definitions. You can change the key-value pairs in the map data structure from message to message, but they all should be of the string
data type.
You can put the data types of these custom key-value pairs into three groups according to the transmission category.
- When the key starts with
meta
: The data field is considered as a metadata custom field. It is sent with metadata and referred asmeta_
key prefix. - When the key starts with
correlation
: The data field is considered as a correlation data custom field. It is sent with correlation data and referred ascorrelation_
key prefix. - When the key starts with
payload
or any other string: The data field is considered as a payload data custom field. It is sent with payload data and saved in referred as key prefix.
Dependencies
In order to publish data to WSO2 DAS/CEP through a custom data agent, you need to have the following dependencies. You can configure the dependencies either using the class path or using the POM file.
Adding dependencies using class path
Add the JAR files listed below to your class path. Note that ${carbon.commons.version}
refers to the version of the carbon-commons github repository - https://github.com/wso2/carbon-commons/. It is always recommended to use the jar file from the latest released version.
org.wso2.carbon.logging_4.3.0.jar
commons-pool-1.5.6.wso2v1.jar
google-collect_1.0.0.wso2v2.jar
org.wso2.carbon.utils_4.3.0.jar
org.wso2.carbon.base_4.3.0.jar
axiom_1.2.11.wso2v5.jar
httpclient-4.2.5.wso2v1.jar
libthrift-0.7.0.wso2v2.jar
slf4j.log4j12-1.6.1.jar
slf4j.api-1.6.1.jar
org.wso2.carbon.databridge.agent-${carbon.commons.version}.jar
org.wso2.carbon.databridge.commons.${carbon.commons.version}.jar
org.wso2.carbon.databridge.commons-${carbon.commons.version}.jar
disruptor-2.10.4.wso2v2.jar
Adding dependencies using POM file
${carbon.commons.version}
refers to the version of the carbon-commons github repository - https://github.com/wso2/carbon-commons/. It is always recommended to use the dependency entry from the latest released version.
<repositories> <repository> <id>wso2.snapshots</id> <name>Apache Snapshot Repository</name> <url>http://maven.wso2.org/nexus/content/repositories/snapshots/</url> <snapshots> <enabled>true</enabled> <updatePolicy>daily</updatePolicy> </snapshots> <releases> <enabled>false</enabled> </releases> </repository> </repositories>
<dependency> <groupId>org.wso2.carbon</groupId> <artifactId>org.wso2.carbon.databridge.agent</artifactId> <version>${carbon.commons.version}</version> </dependency> <dependency> <groupId>org.wso2.carbon</groupId> <artifactId>org.wso2.carbon.databridge.commons</artifactId> <version>${carbon.commons.version}</version> </dependency> <dependency> <groupId>org.wso2.carbon</groupId> <artifactId>org.wso2.carbon.databridge.commons.thrift</artifactId> <version>${carbon.commons.version}</version> </dependency>
Configuring the data agent
A data agent is a single controller for all types of data publishers created. Data publishers share resources such as client pool etc. with one data agent. Thrift data agent is available by default. You can also extend and write a new data agent such as a binary data agent.
Follow the steps below to configure a data agent.
Load the following sample configurations and properties to define the data agent in the JVM.
<DataAgentsConfiguration> <Agent> <Name>Thrift</Name> <DataEndpointClass>org.wso2.carbon.databridge.agent.internal.endpoint.thrift.ThriftDataEndpoint</DataEndpointClass> <TrustSore>src/main/resources/client-truststore.jks</TrustSore> <TrustSorePassword>wso2carbon</TrustSorePassword> <QueueSize>32768</QueueSize> <BatchSize>10000</BatchSize> <ReconnectionInterval>30</ReconnectionInterval> <MaxTransportPoolSize>250</MaxTransportPoolSize> <MaxIdleConnections>250</MaxIdleConnections> <EvictionTimePeriod>5500</EvictionTimePeriod> <MinIdleTimeInPool>5000</MinIdleTimeInPool> <SecureMaxTransportPoolSize>250</SecureMaxTransportPoolSize> <SecureMaxIdleConnections>250</SecureMaxIdleConnections> <SecureEvictionTimePeriod>5500</SecureEvictionTimePeriod> <SecureMinIdleTimeInPool>5000</SecureMinIdleTimeInPool> </Agent> <Agent> <Name>Binary</Name> <DataEndpointClass>org.wso2.carbon.databridge.agent.internal.endpoint.binary.BinaryDataEndpoint </DataEndpointClass> <TrustSore>src/main/resources/client-truststore.jks</TrustSore> <TrustSorePassword>wso2carbon</TrustSorePassword> <QueueSize>32768</QueueSize> <BatchSize>10000</BatchSize> <ReconnectionInterval>30</ReconnectionInterval> <MaxTransportPoolSize>250</MaxTransportPoolSize> <MaxIdleConnections>250</MaxIdleConnections> <EvictionTimePeriod>5500</EvictionTimePeriod> <MinIdleTimeInPool>5000</MinIdleTimeInPool> <SecureMaxTransportPoolSize>250</SecureMaxTransportPoolSize> <SecureMaxIdleConnections>250</SecureMaxIdleConnections> <SecureEvictionTimePeriod>5500</SecureEvictionTimePeriod> <SecureMinIdleTimeInPool>5000</SecureMinIdleTimeInPool> </Agent> </DataAgentsConfiguration>
To configure the above parameters in the
<CEP_HOME>/repository/conf/data-bridge/data-agent-conf.xml
file in order to tune performance, follow the instructions in WSO2 event configuration.- Instantiate the data publisher as follows:
AgentHolder. setConfigPath(“/path/to/data/agent/conf.xml”)
Instantiate and use the data publisher using one of the following configurations:
DataPublisher dataPublisher = new DataPublisher(url, username, password);
DataPublisher dataPublisher = new DataPublisher(receiverURLSet, username, password);
DataPublisher dataPublisher = new DataPublisher(receiverURLSet,authURLSet, username, password);
For information on the
receiverURLSet
andauthURLSet
parameters of the above configuration, see /wiki/spaces/PS/pages/10519892 .In all the above methods, the default data agent (which is configured as first Agent element in the above configuration) will be used to create the data publishers. If you have configured only the Thrift data agent in the
<CEP_HOME>/repository/conf/data-bridge/data-agent-conf.xml
file, then this will provide you a Thirft-based data publisher instance.However, if you have configured more types of data agents in the
<CEP_HOME>/repository/conf/data-bridge/data-agent-conf.xml
file (Eg: Binary Agent in the above sample data-agent-conf.xml ), then you can pass an additional property namedtype
, which denotes the type of data publisher that needs to be created. For example, if you have a binary data publisher, then you can passbinary
as the type to get the binary data publisher Instance as shown below.DataPublisher dataPublisher = new DataPublisher(String type, String receiverURLSet, String authURLSet, String username, String password);
Data publisher sample
As a prerequisite for this sample, you need to define the streams in the receiver server (WSO2 DAS/CEP). For information on defining event streams, see Working with Event Streams.
Follow the procedure below to use the data publisher.
Initialize the data publisher as follows.
AgentHolder. setConfigPath ( getDataAgentConfigPath ()); DataPublisher dataPublisher = new DataPublisher(url, username, password);
Generate the stream ID for the stream from which you are going to publish the event as follows.
String streamId = DataBridgeCommonsUtils.generateStreamId(HTTPD_LOG_STREAM, VERSION);
- Publish the events using any of the following methods.
In the following configuration, the published event is blocked being called until the event is put into a disruptor. If the disruptor is full it will wait until there is a free space.
Event event = new Event(streamId, System.currentTimeMillis(), new Object[]{"external"}, null, new Object[]{aLog}); dataPublisher.publish(event);
Try publish as shown in the following configuration, is a non-blocking publishing. If there is a space available in the disruptor, it will try to insert the event. However, if the disruptor is full, the event is returned back immediately without waiting.
Event event = new Event(streamId, System.currentTimeMillis(), new Object[]{"external"}, null, new Object[]{aLog}); dataPublisher.tryPublish(event);
Try publish as shown in the following configuration, is a non-blocking publishing with timeout in mili seconds. if there is a space available in the disruptor it will try to insert the event, but if the disruptor is full it will wait for the specified amount of time, and if the timeout is reached the event is returned back.
Event event = new Event(streamId, System.currentTimeMillis(), new Object[]{"external"}, null, new Object[]{aLog}); dataPublisher.tryPublish(event, 100);
For more information on the usage of data publishers, see the sample in the
<CEP_HOME>/samples/producers/wso2-event/
directory.