Streaming Machine Learning
Introduction
In the previous tutorial, we looked at how static machine learning and prediction can be done via PMML. In this tutorial, let's see how real-time online machine learning support is offered in Stream Processor through various mechanisms.
After the production of each sweet in the Sweet Factory, its density and volume are measured and sent to WSO2 SP. Based on this data, WSO2 SP needs to predict what the produced sweet is out of 5 possibilities.
Before you begin:
The siddhi-gpl-execution-streamingml extension needs to be added to <SP_HOME>/lib
directory.
Tutorial steps
Let's get started!
The input data captured for the training phase must include the name of the sweet, its density and volume. Let's define an input stream as shown below to capture this information.
define stream ProductionTrainingStream (density double, volume double, sweetType string );
You need another input stream through which the volume and the density of the unknown sweet must be sent during the predicting phase. Let's define it as follows.
define stream SweetProductionStream (density double, volume double);
Finally, let's define an output stream to present the output of the predictions.
define stream PredictionStream (density double, volume double, prediction string, confidenceLevel double);
Now, let's write a simple query to select values from the trainer stream and write them to a temporary stream.
from ProductionTrainingStream select * insert into TemporaryStream;
Here, the
TemporaryStream
stream is an in-memory stream with no definition and no sources/sinks. Therefore, it can be used for routing events that are not needed elsewhere.Now, let's update the query to enable it to train the Hoeffding tree. This can be achieved by specifying theÂ
#streamingml
annotation to the input stream, and setting its type toHoeffdingTree
. Here, we can either specify whether to train the tree (updateHoeffdingTree)
or to produce an output based on the learnt model (hoeffdingTreeClassifier
).from ProductionTrainingStream#streamingml:updateHoeffdingTree()
select *
insert into TemporaryStream;
Here, the
TemporaryStream
stream is an in-memory stream with no definition and no sources/sinks. Therefore, it can be used for routing events that are not needed elsewhere.- You need to add two parameters to theÂ
updateHoeffdingTree
directive for the following.
- The name of the model that is built or updated.
- The number of classes in the predicted output. e.g., if the training data can contain five types of sweets, the value for this parameter must be 5 in order to indicate that there are five possible values for the attribute that is being predicted.
In addition, you can also specify the attributes in the input stream that are needed for the training operation. e.g., If you are using the attributesX
,Y
, andZ
from the stream to train the model so that it can predict the value for theK
attribute, you must also include theX
,Y
,Z
, andK
attributes in the trainer stream definition. The last attribute is considered as the attribute for which the prediction is needed.
Taking the above into account, let's update the trainer stream definition as follows.from ProductionTrainingStream#streamingml:updateHoeffdingTree( 'SweetTypeModel', 5, density, volume, sweetType )
select *
insert into TemporaryStream;
Now that the annotation is complete, the events going through to theÂ
TemporaryStream
stream contain the accuracy evaluation of the model. This is not needed in the end result, and therefore, you can omit it.
Now let's consider the other end of the prediction process where the model makes predictions based on the density and volume. Let's write a simple query to select values from the prediction stream and direct them to the output stream.
from SweetProductionStream select density, volume, prediction, confidenceLevel insert into PredictionStream;
Let's also define the second
#streamingml
annotation to theSweetProductionStream
stream to train the model based on events arriving (as you previously did in step 5).from SweetProductionStream#streamingml
select density, volume, prediction, confidenceLevel
insert into PredictionStream;
This time, we need to use the classifier to perform a prediction instead of training the tree. Therefore, let's add theÂ
hoeffdingTreeClassifier
annotation instead of theÂupdateHoeffdingTree
annotation you previously added.
from SweetProductionStream#streamingml:hoeffdingTreeClassifier()
select density, volume, prediction, confidenceLevel
insert into PredictionStream;
Let's specify the following parameters for the model.
- The name of the model to which you want to refer. You can refer to theÂ
SweetTypeModel
 model that you previously trained. The stream attributes to be used for querying the model for predictions. In this scenario, you need to query by the density and the volume of the unknown sweet.
from SweetProductionStream#streamingml:hoeffdingTreeClassifier('SweetTypeModel', density, volume)
select density, volume, prediction, confidenceLevel
insert into PredictionStream;
Here, the
confidenceLevel
attribute is a standard returnable from the classifier. It indicates he extent to which the generated prediction can be accurate.- The name of the model to which you want to refer. You can refer to theÂ
The final query (with source and sink configurations added) looks as follows.
@App:name('SugerSyrupPredictionApp') @source(type = 'http', @map(type = 'json')) define stream ProductionTrainingStream (density double, volume double, sweetType string ); @source(type = 'http', @map(type = 'json')) define stream SweetProductionStream (density double, volume double); @sink(type='log', prefix='Predicted sweet type:') define stream PredictionStream (density double, volume double, prediction string, confidenceLevel double); @info(name = 'training-query') from ProductionTrainingStream#streamingml:updateHoeffdingTree('SweetTypeModel', 5, density, volume, sweetType) select * insert into TemporaryStream; @info(name = 'prediction-query') from SweetProductionStream#streamingml:hoeffdingTreeClassifier('SweetTypeModel', density, volume) select density, volume, prediction, confidenceLevel insert into PredictionStream;