Streaming Machine Learning
Introduction
In the previous tutorial, we looked at how static machine learning and prediction can be done via PMML. In this tutorial, let's see how real-time online machine learning support is offered in Stream Processor through various mechanisms.
After the production of each sweet in the Sweet Factory, its density and volume are measured and sent to WSO2 SP. Based on this data, WSO2 SP needs to predict what the produced sweet is out of 5 possibilities.
This tutorial covers the following concepts:
A Hoeffding Classifier model is trained using real-time data to get predictions.
The other types of algorithms supported are MRulesRegressor, ClusTree, K-Means, and PerceptronClassifier.
Before you begin:
The siddhi-gpl-execution-streamingml extension needs to be added to <SP_HOME>/lib directory.
Tutorial steps
Let's get started!
The input data captured for the training phase must include the name of the sweet, its density and volume. Let's define an input stream as shown below to capture this information.
You need another input stream through which the volume and the density of the unknown sweet must be sent during the predicting phase. Let's define it as follows.
Finally, let's define an output stream to present the output of the predictions.
Now, let's write a simple query to select values from the trainer stream and write them to a temporary stream.
Here, the
TemporaryStreamstream is an in-memory stream with no definition and no sources/sinks. Therefore, it can be used for routing events that are not needed elsewhere.Now, let's update the query to enable it to train the Hoeffding tree. This can be achieved by specifying the
#streamingmlannotation to the input stream, and setting its type toHoeffdingTree. Here, we can either specify whether to train the tree (updateHoeffdingTree)or to produce an output based on the learnt model (hoeffdingTreeClassifier).Here, the
TemporaryStreamstream is an in-memory stream with no definition and no sources/sinks. Therefore, it can be used for routing events that are not needed elsewhere.You need to add two parameters to the
updateHoeffdingTreedirective for the following.
The name of the model that is built or updated.
The number of classes in the predicted output. e.g., if the training data can contain five types of sweets, the value for this parameter must be 5 in order to indicate that there are five possible values for the attribute that is being predicted.
In addition, you can also specify the attributes in the input stream that are needed for the training operation. e.g., If you are using the attributesX,Y, andZfrom the stream to train the model so that it can predict the value for theKattribute, you must also include theX,Y,Z, andKattributes in the trainer stream definition. The last attribute is considered as the attribute for which the prediction is needed.
Taking the above into account, let's update the trainer stream definition as follows.Now that the annotation is complete, the events going through to the
TemporaryStreamstream contain the accuracy evaluation of the model. This is not needed in the end result, and therefore, you can omit it.
Now let's consider the other end of the prediction process where the model makes predictions based on the density and volume. Let's write a simple query to select values from the prediction stream and direct them to the output stream.
Let's also define the second
#streamingmlannotation to theSweetProductionStreamstream to train the model based on events arriving (as you previously did in step 5).This time, we need to use the classifier to perform a prediction instead of training the tree. Therefore, let's add the
hoeffdingTreeClassifierannotation instead of theupdateHoeffdingTreeannotation you previously added.
Let's specify the following parameters for the model.
The name of the model to which you want to refer. You can refer to the
SweetTypeModelmodel that you previously trained.The stream attributes to be used for querying the model for predictions. In this scenario, you need to query by the density and the volume of the unknown sweet.
Here, the
confidenceLevelattribute is a standard returnable from the classifier. It indicates he extent to which the generated prediction can be accurate.
The final query (with source and sink configurations added) looks as follows.
@App:name('SugerSyrupPredictionApp')
@source(type = 'http', @map(type = 'json'))
define stream ProductionTrainingStream (density double, volume double, sweetType string );
@source(type = 'http', @map(type = 'json'))
define stream SweetProductionStream (density double, volume double);
@sink(type='log', prefix='Predicted sweet type:')
define stream PredictionStream (density double, volume double, prediction string, confidenceLevel double);
@info(name = 'training-query')
from ProductionTrainingStream#streamingml:updateHoeffdingTree('SweetTypeModel', 5, density, volume, sweetType)
select *
insert into TemporaryStream;
@info(name = 'prediction-query')
from SweetProductionStream#streamingml:hoeffdingTreeClassifier('SweetTypeModel', density, volume)
select density, volume, prediction, confidenceLevel
insert into PredictionStream;