Writing Analytics
WSO2 IoT Server uses WSO2 Data Analytics Server ( WSO2 DAS) to write batch analytics and process historical sensor data. Your device will have one or more sensors to gather data. For example, the IRS+ Drone has sensors to identify its current location, velocity, and battery charged percentage.
Before you begin, you need to have an understanding of the IoTS analytics framework. For more information, see WSO2 IoT Server Analytics. Further, let's get an understanding of the DAS artifacts that each sensor in WSO2 IoTS needs to have.
- Event receivers
WSO2 DAS receives data published by agents or sensors through Event Receivers. Each event receiver is associated with an event stream, which you then persist and/or process using event executor. There are many transport types supported as entry protocols for Event Receivers in WSO2 DAS. For more information on Event Receivers, see the WSO2 DAS Configuring Event Receivers.
- Event streams
The data received from the receiver are then converted to event streams. - Event Store
Event Store is used to store events that are published directly to the DAS. You need to have the event store unit if you want to analyze the historical data received by the device sensors.
- Spark scripts
Main analytics engine of WSO2 DAS is based on Apache Spark. This is used to perform batch analytics operations on the data stored in Event Stores using analytics scripts written in Spark SQL - Execution plan
WSO2 DAS uses a real-time event processing engine which is based on Siddhi. For more information on real-time analytics using Siddhi, see the WSO2 DAS documentation on Realtime Analytics Using Siddhi.
- Event publisher
Output data either from Spark scripts or Siddhi are published from the DAS using event publishers. Event Processors support various transport protocols.
If you wish to only use real-time analytics for your device type, you only require the Event Receiver, Event Streams, and Event Execution artifacts.
If you wish to analyze the historical data of the device, you require the Event Store and Spark script artifacts too.
Sample implementation
Step 1: Creating the DAS Carbon Application
Let's take a look at how the analytics were written for the Raspberry Pi device type and how it is structured to create a DAS Composite Application (DAS C-App). The DAS Capp defines the artifacts, and ships them to WSO2 DAS as an archive. The sample DAS C-App folder structure is as follows:
- Creating the event stream artifact.
Create the stream format JSON file using a unique name, such as
org.wso2.iot.raspberrypi_1.0.0
, to stream the temperature data.
The stream JSON definition consists of 2 main sections namedmetaData
andpayloadData
.Make sure to only modify the
payloadData
section to suit your requirement as it contains the data that will be published.Example:
Stream JSON format to gather data of the coffee level{ "name": "org.wso2.iot.raspberrypi", "version": "1.0.0", "nickName": "raspberrypi", "description": "Temperature data received from the raspberrypi", "metaData": [ {"name":"owner","type":"STRING"}, {"name":"deviceId","type":"STRING"}, {"name":"time","type":"LONG"} ], "payloadData": [ { "name": "temperature","type": "FLOAT" } ] }
Define the event stream as an artifact by configuring the
artifact.xml
file.Example:
Sample artifact.xml<artifact name= "raspberrypi_stream" version="1.0.0" type="event/stream" serverRole="DataAnalyticsServer"> <file>org.wso2.iot.raspberrypi_1.0.0.json</file> </artifact>
- Creating the execution plan artifact to convert the data received and process them.
Create a STORM based distributed execution plan. For more information, see the WSO2 DAS documentation on Creating a STORM Based Distributed Execution Plan.
Example:
/* Enter a unique ExecutionPlan */ @Plan:name('raspberrypi_execution') /* Enter a unique description for ExecutionPlan */ -- @Plan:description('raspberrypi_execution') /* define streams/tables and write queries here ... */ @Import('org.wso2.iot.raspberrypi:1.0.0') define stream raspberrypi (meta_owner string, meta_deviceId string, meta_time long, temperature float); @Export('org.wso2.iot.devices.temperature:1.0.0') define stream temperature (meta_owner string, meta_deviceType string, meta_deviceId string, meta_time long, temperature float); from raspberrypi select meta_owner, 'raspberrypi' as meta_deviceType, meta_deviceId, meta_time * 1000 as meta_time, temperature insert into temperature;
Define the execution/processing unit as an artifact by configuring the
artifact.xml
file.Example:
<artifact name="raspberrypi_execution" version="1.0.0" type="event/execution-plan" serverRole="DataAnalyticsServer"> <file>raspberrypi_execution.siddhiql</file> </artifact>
- Creating the event receiver artifact.
- Create an XML file containing the details on binding the stream to the table using receivers.
Example:
<eventReceiver name="raspberrypi_receiver" statistics="disable" trace="disable" xmlns="http://wso2.org/carbon/eventreceiver"> <from eventAdapterType="oauth-mqtt"> <property name="topic">carbon.super/raspberrypi/+/temperature</property> <property name="username">admin</property> <property name="contentValidator">org.wso2.carbon.device.mgt.input.adapter.mqtt.util.MQTTContentValidator</property> <property name="contentTransformer">default</property> <property name="dcrUrl">https://${iot.core.host}:${iot.core.https.port}/dynamic-client-web/register</property> <property name="url">tcp://${mqtt.broker.host}:${mqtt.broker.port}</property> <property name="cleanSession">true</property> </from> <mapping customMapping="disable" type="json"/> <to streamName="org.wso2.iot.raspberrypi" version="1.0.0"/> </eventReceiver>
Define the event receiver as an artifact by configuring the
artifact.xml
file.Example:
<artifact name="raspberrypi_receiver" version="1.0.0" type="event/receiver" serverRole="DataAnalyticsServer"> <file>raspberrypi_receiver.xml</file> </artifact>
- Create an XML file containing the details on binding the stream to the table using receivers.
Create a deployable artifact to create an archive of all the artifacts created above.
Example:
<artifacts> <artifact name="raspberrypi" version="1.0.0" type="carbon/application"> <dependency artifact="raspberrypi_stream" version="1.0.0" include="true" serverRole="DataAnalyticsServer"/> <dependency artifact="raspberrypi_receiver" version="1.0.0" include="true" serverRole="DataAnalyticsServer"/> <dependency artifact="raspberrypi_execution" version="1.0.0" include="true" serverRole="DataAnalyticsServer"/> </artifact> </artifacts>
Configure the
build.xml
for analytics.Example:
<project name="create-raspberrypi-capps" default="zip" basedir="."> <property name="project-name" value="${ant.project.name}"/> <property name="target-dir" value="target/carbonapps"/> <property name="src-dir" value="src/main/resources/carbonapps"/> <property name="Raspberrypi_dir" value="raspberrypi"/> <target name="clean"> <delete dir="${target-dir}" /> </target> <target name="zip" depends="clean"> <mkdir dir="${target-dir}"/> <zip destfile="${target-dir}/${Raspberrypi_dir}.car"> <zipfileset dir="${src-dir}/${Raspberrypi_dir}"/> </zip> </target> </project>
Step 2: Publishing data to WSO2 DAS
Once the DAS C-App is created you need to deploy it. Follow any of the methods given below to publish the historical data retrieved by the sensors, depending on your environment.
Developer/Testing environment
WSO2 IoTS is prepackaged with WSO2 DAS features. Therefore, the data gathered from the sensors are published to WSO2 DAS when the DAS C-App is deployed in WSO2 IoTS.
Follow the steps given below:Navigate to the
<IoT_HOME>/broker/bin
directory and start the IoT Server broker profile.cd <IoT_HOME>/broker/bin ./wso2server.sh
Navigate to the
<IoT_HOME>/core/bin
directory and start the IoT Server core profile.cd <IoT_HOME>/core/bin ./wso2server.sh
Navigate to the
<IoT_HOME>/analytics/bin
directory and start the IoT Server analytics profile.cd <IoT_HOME>/analytics/bin ./wso2server.sh
Production environment
In a production environment deploying the DAS C-App on WSO2 IoTS will not be sufficient. Therefore, you need to configure WSO2 DAS to integrate WSO2 IoTS.
Step 3: Configuring WSO2 IoTS to generate graphs
To generate graphs based on the data gathered from the sensor, you need to configure WSO2 IoTS as explained below:
- Navigate to the
analytics-data-config.xml
file that is in the<IoTS_HOME>/conf/analytics
directory. Configure the fields under the
<AnalyticsDataConfiguration>
tag.Each field is described in the
analytics-data-config.xml
file.Example:
<AnalyticsDataConfiguration> <Mode>REMOTE</Mode> <URL>http://10.10.10.345:9765</URL> <Username>admin_username</Username> <Password>admin_password</Password> <MaxConnections>200</MaxConnections> <MaxConnectionsPerRoute>200</MaxConnectionsPerRoute> <SocketConnectionTimeout>60000</SocketConnectionTimeout> <ConnectionTimeout>60000</ConnectionTimeout> </AnalyticsDataConfiguration>