Device Communicating with the Server
The data received by your device sensors will be sent to WSO2 Data Analytic Server (DAS) so that you can view them in real-time or as historical data. Before sending data to WSO2 DAS, you will want to encode it or convert it to a preferred format. How can you do this?
WSO2 IoT Server provides you with the ability to transform and validate content before sending them to WSO2 DAS. Once this is done you can configure WSO2 IoTS to send data to subscribed events or send them directly to WSO2 DAS. Take a look at the diagram given below:
Let's take a look at the two use cases given below to identify how the device communicates with the server.
Sending data to WSO2 DASÂ
You can write your own custom logic to transform and validate content before sending your data to WSO2 DAS. To understand how this is done, let's take a look at how the content transformer and validator are used by the analytics receiver of the RaspberryPi device type
The MQTT transport extension is used for this purpose.
Transform the data you receive, such as encoding the data, before sending it to WSO2 DAS.Â
In the case of the RaspberryPi device type, it uses the default content transformer that sends the data as it is to WSO2 DAS.
If you wish to transform the content, you need to implement the
contentTransformer
interface.
Example:public class MQTTContentTransformer implements ContentTransformer {}
- Customize the content transformer to match your requirement.
Validate content.
Why validate?
Once the MQTT transport extension/input adapter receives the data sent by the device, you will need to verify if it's a spoofer or if it's the device user that is sending the data. After this fact is verified you can configure WSO2 IoTS to send the content to WSO2 DAS.
In the RaspberryPi device type, the API path that is used to send the data is verified by checking if the device ID in it matches the ID of the device registered with WSO2 IoTS. You can validate the content using the MQTT, HTTP, and XMPP content validators.Â
Extend the content transformer interface, and customize it to suit our requirement.
Example:MQTTContentValidator.java
public class MQTTContentValidator implements ContentValidator {}
Customize the content validator to match the device ID and verify the user and the device.
Example:MQTTContentValidator.java
public class MQTTContentValidator implements ContentValidator { private static final String JSON_ARRAY_START_CHAR = "["; private static final Log log = LogFactory.getLog(MQTTContentValidator.class); @Override public ContentInfo validate(Object msgPayload, Map < String, String > contentValidationParams, Map < String, String > dynamicParams) { String topic = dynamicParams.get(MQTTEventAdapterConstants.TOPIC); String topics[] = topic.split("/"); String deviceIdJsonPath = contentValidationParams.get(MQTTEventAdapterConstants.DEVICE_ID_JSON_PATH); String deviceIdInTopicHierarchyLevel = contentValidationParams.get( MQTTEventAdapterConstants.DEVICE_ID_TOPIC_HIERARCHY_INDEX); int deviceIdInTopicHierarchyLevelIndex = 0; if (deviceIdInTopicHierarchyLevel != null && !deviceIdInTopicHierarchyLevel.isEmpty()) { deviceIdInTopicHierarchyLevelIndex = Integer.parseInt(deviceIdInTopicHierarchyLevel); } String deviceIdFromTopic = topics[deviceIdInTopicHierarchyLevelIndex]; boolean status; String message = (String) msgPayload; if (message.startsWith(JSON_ARRAY_START_CHAR)) { status = processMultipleEvents(message, deviceIdFromTopic, deviceIdJsonPath); } else { status = processSingleEvent(message, deviceIdFromTopic, deviceIdJsonPath); } return new ContentInfo(status, msgPayload); }
Configuring the Analytic Receiver.
Why configure the receiver?
Once you have configured WSO2 IoTS to transform and validate content, you need to direct the receiver that receives these data to the correct classpaths so as to transform and validate the content accordingly. This needs to be configured in the respective
<DEVICE_TYPE>_receiver.xml
.Define the classpath to where you configured the content transformer.
If you are using the default method provided by WSO2 IoTS, you need to define the value as
default
. In the default method, the data received will be sent directly to WSO2 DAS without any transformation.Example: RaspberryPi uses the default transformation method.
<property name="contentTransformer">default</property>
Define the classpath to where you configured the content validation.
If you are using the default method provided by WSO2 IoTS, you need to define the value as
default
. In the default method, the data received will be sent directly to WSO2 DAS without any validation.Example: RaspberryPi uses a customized content validation method.
<property name="contentValidator">org.wso2.carbon.device.mgt.iot.input.adapter.mqtt.util.MQTTContentValidator</property>
Sending data to subscribed events
In situations where you have a custom scenario and need to receive and publish data only via a given stream, WSO2 IoTS gives you the flexibility to send data to subscribed events. Let's take a look at our virtual fire alarm implementation to understand this clearly.
Transform the data you received, such as encoding the data. This is similar to what was explained under sending data to WSO2 DAS.
In the virtual fire alarm implementation, transforming data is implemented in the plugin directory.
Example:Â
VirtualFirealarmMqttContentTransformer.java
public Object transform(Object message, Map < String, Object > dynamicProperties) { String topic = (String) dynamicProperties.get("topic"); String[] topicParams = topic.split("/"); String tenantDomain = topicParams[0]; String deviceId = topicParams[2]; JSONObject jsonPayload = new JSONObject((String) message); try { PrivilegedCarbonContext.startTenantFlow(); PrivilegedCarbonContext ctx = PrivilegedCarbonContext.getThreadLocalCarbonContext(); ctx.setTenantDomain(tenantDomain, true); Integer serialNo = (Integer) jsonPayload.get(VirtualFireAlarmConstants.JSON_SERIAL_KEY); // the hash-code of the deviceId is used as the alias for device certificates during SCEP enrollment. // hence, the same is used here to fetch the device-specific-certificate from the key store. PublicKey clientPublicKey = VirtualFireAlarmUtils.getDevicePublicKey("" + serialNo); // the MQTT-messages from VirtualFireAlarm devices are in the form {"Msg":<MESSAGE>, "Sig":<SIGNATURE>} String actualMessage = VirtualFireAlarmUtils.extractMessageFromPayload((String) message, clientPublicKey); return deviceId + "," + actualMessage; } catch (VirtualFirealarmDeviceMgtPluginException e) { return ""; } finally { PrivilegedCarbonContext.endTenantFlow(); } }
- Validate the content. You can implement a content validation method if you want to validate the content before sending it to subscribed events.
In the case of virtual fire alarm, the content is transformed and directly sent to the subscribed event without any validation. Configure the event subscription.Â
Example:ÂVirtualFirealarmEventAdapterSubscription.java
public void onEvent(Object o) { String msg = (String) o; if (msg != null && !msg.isEmpty()) { String[] messages = (msg).split(","); String deviceId = messages[0]; String actualMessage = messages[1]; if (actualMessage.contains("PUBLISHER")) { float temperature = Float.parseFloat(actualMessage.split(":")[2]); VirtualFireAlarmUtils.publishToDAS(deviceId, temperature); } else { float temperature = Float.parseFloat(actualMessage.split(":")[1]); VirtualFireAlarmUtils.publishToDAS(deviceId, temperature); } } }
Configure the
utils
class to call the event subscription and the content validator and transformer.Create a method to direct the data stream for validation and transformation from the
utils
class.
Example:VirtualFireAlarmUtils.java
private static InputEventAdapterConfiguration createMqttInputEventAdapterConfiguration(String name, String type, String msgFormat) throws IOException { InputEventAdapterConfiguration inputEventAdapterConfiguration = new InputEventAdapterConfiguration(); inputEventAdapterConfiguration.setName(name); inputEventAdapterConfiguration.setType(type); inputEventAdapterConfiguration.setMessageFormat(msgFormat); File configFile = new File(VirtualFireAlarmConstants.MQTT_CONFIG_LOCATION); if (configFile.exists()) { Map < String, String > mqttAdapterProperties = new HashMap < > (); InputStream propertyStream = configFile.toURI().toURL().openStream(); Properties properties = new Properties(); properties.load(propertyStream); mqttAdapterProperties.put(VirtualFireAlarmConstants.USERNAME_PROPERTY_KEY, properties.getProperty( VirtualFireAlarmConstants.USERNAME_PROPERTY_KEY)); mqttAdapterProperties.put(VirtualFireAlarmConstants.DCR_PROPERTY_KEY, Utils.replaceSystemProperty( properties.getProperty(VirtualFireAlarmConstants.DCR_PROPERTY_KEY))); mqttAdapterProperties.put(VirtualFireAlarmConstants.BROKER_URL_PROPERTY_KEY, replaceMqttProperty( properties.getProperty(VirtualFireAlarmConstants.BROKER_URL_PROPERTY_KEY))); mqttAdapterProperties.put(VirtualFireAlarmConstants.SCOPES_PROPERTY_KEY, properties.getProperty( VirtualFireAlarmConstants.SCOPES_PROPERTY_KEY)); mqttAdapterProperties.put(VirtualFireAlarmConstants.CLEAR_SESSION_PROPERTY_KEY, properties.getProperty( VirtualFireAlarmConstants.CLEAR_SESSION_PROPERTY_KEY)); mqttAdapterProperties.put(VirtualFireAlarmConstants.QOS_PROPERTY_KEY, properties.getProperty( VirtualFireAlarmConstants.QOS_PROPERTY_KEY)); mqttAdapterProperties.put(VirtualFireAlarmConstants.CLIENT_ID_PROPERTY_KEY, ""); mqttAdapterProperties.put(VirtualFireAlarmConstants.TOPIC, VirtualFireAlarmConstants.SUBSCRIBED_TOPIC); mqttAdapterProperties.put(VirtualFireAlarmConstants.CONTENT_TRANSFORMATION, VirtualFirealarmMqttContentTransformer.class.getName()); mqttAdapterProperties.put(VirtualFireAlarmConstants.CONTENT_VALIDATION, "default"); mqttAdapterProperties.put(VirtualFireAlarmConstants.RESOURCE, "input-event"); inputEventAdapterConfiguration.setProperties(mqttAdapterProperties); } return inputEventAdapterConfiguration; }
Create a method to call the event subscription class.
Example:ÂVirtualFireAlarmUtils.java
public static void setupMqttInputAdapter() throws IOException { InputEventAdapterConfiguration inputEventAdapterConfiguration = createMqttInputEventAdapterConfiguration(VirtualFireAlarmConstants.MQTT_ADAPTER_NAME, VirtualFireAlarmConstants.MQTT_ADAPTER_TYPE, MessageType.TEXT); try { PrivilegedCarbonContext.startTenantFlow(); PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantDomain( VirtualFireAlarmConstants.DEVICE_TYPE_PROVIDER_DOMAIN, true); VirtualFirealarmManagementDataHolder.getInstance().getInputEventAdapterService() .create(inputEventAdapterConfiguration, new VirtualFirealarmEventAdapterSubscription()); } catch (InputEventAdapterException e) { log.error("Unable to create Input Event Adapter : " + VirtualFireAlarmConstants.MQTT_ADAPTER_NAME, e); } finally { PrivilegedCarbonContext.endTenantFlow(); } }