com.atlassian.confluence.content.render.xhtml.migration.exceptions.UnknownMacroMigrationException: The macro 'next_previous_links' is unknown.

WSO2 CEP Extension for ML Predictions

This extension facilitates you to use the machine learning models which you generate using WSO2 ML within WSO2 Complex Event Processor (CEP) for making predictions. Thereby, it integrates WSO2 ML with WSO2 CEP, to perform realtime predictions on an event stream by applying a model generated by WSO2 ML. An input event stream which is received by an event receiver of WSO2 CEP is processed by executing an execution plan within WSO2 CEP. This execution plan is written using Siddhi language. It processes the input event stream by applying the model generated using WSO2 ML. The output of this processing which includes the prediction will be published to an output stream through an event publisher of WSO2 CEP. For more information on WSO2 CEP, go to WSO2 CEP Documentation.

Machine Learner models are not backward compatible. e.g., models generated using WSO2 ML 1.0.0 cannot be used with WSO2 ML 1.1.0 functionality.


Siddhi syntax for the extension

There are two possible Siddhi query syntaxes to use the extension in an execution plan as follows.

  1. <double|float|long|int|string|boolean> predict(<string> pathToMLModel, <string> dataType)
    • Extension TypeStreamProcessor
    • Description: Returns an output event with the additional attribute with the response variable name of the model, set with the predicted value, using the feature values extracted from the input event.
    • ParameterpathToMLModel:  The file path or the registry path where ML model is located. If the model storage location is registry, the value of this this parameter should have the prefix registry:
    • Parameter: dataType: Data type of the predicted value (double, float, long, integer/int, string, boolean/bool).

    • Parameter: percentileValue: Percentile value for the prediction. It should be a double value between 0 - 100. This parameter is only relevant when the algorithm of the model used for prediction is of the Anomaly Detection type.

    • Examplepredict(‘registry:/_system/governance/mlmodels/indian-diabetes-model’,'double')

  2. <double|float|long|int|string|boolean> predict(<string> pathToMLModel, <string> dataType<double> input)

    • Extension TypeStreamProcessor
    • Description: Returns an output event with the additional attribute with the response variable name of the model, set with the predicted value, using the feature values extracted from the input event.
    • ParameterpathToMLModel: The file path or the registry path where ML model is located. If the model storage location is registry, the value of this parameter should have the prefix registry:
    • Parameter: dataType: Data type of the predicted value (double, float, long, integer/int, string, boolean/bool).

    • Parameterinput: A variable attribute value of the input stream which is sent to the ML model as feature values for predictions. Function does not accept any constant values as input parameters. You can have multiple input parameters. 

    • Parameter: percentileValue: Percentile value for the prediction. It should be a double value between 0 - 100. This parameter is only relevant when the algorithm of the model used for prediction is of the Anomaly Detection type.

    • Examplepredict(‘registry:/_system/governance/mlmodels/indian-diabetes-model’,'double', NumPregnancies, TSFT, DPF, BMI, DBP, PG2, Age, SI2)

Siddhi query examples

A few Siddhi query examples that can be used in execution plans within WSO2 CEP based on the above syntaxes are as follows.

Example 1
@Import('InputStream:1.0.0')
define stream InputStream (NumPregnancies double, TSFT double, DPF double, BMI double, DBP double, PG2 double, Age double, SI2 double);
@Export('PredictionStream:1.0.0')
define stream PredictionStream (NumPregnancies double, TSFT double, DPF double, BMI double, DBP double, PG2 double, Age double, SI2 double, Class double);
from InputStream#ml:predict('registry:/_system/governance/ml/indian-diabetes-model', 'double')
select *
insert into PredictionStream;
Example 2
@Import('InputStream:1.0.0')
define stream InputStream (NumPregnancies double, TSFT double, DPF double, BMI double, DBP double, PG2 double, Age double, SI2 double);
@Export('PredictionStream:1.0.0')
define stream PredictionStream (NumPregnancies double, TSFT double, DPF double, BMI double, DBP double, PG2 double, Age double, SI2 double, Class double);
from InputStream#ml:predict('registry:/_system/governance/ml/indian-diabetes-model', 'double', NumPregnancies, TSFT, DPF, BMI, DBP, PG2, Age, SI2)
select *
insert into PredictionStream;
Example 3
@Import('InputStream:1.0.0')
define stream InputStream (NumPregnancies double, TSFT double, DPF double, BMI double, DBP double, PG2 double, Age double, SI2 double);
@Export('OutStream:1.0.0')
define stream OutStream (Class double);
from InputStream#ml:predict('registry:/_system/governance/ml/indian-diabetes-model', 'double', NumPregnancies, TSFT, DPF, BMI, DBP, PG2, Age, SI2)
select Class
insert into OutStream;
Example 4
@Import('InputStream:1.0.0')
define stream InputStream (NumPregnancies double, TSFT double, DPF double, BMI double, DBP double, PG2 double, Age double, SI2 double);
@Export('PredictionStream:1.0.0')
define stream PredictionStream (NumPregnancies double, TSFT double, DPF double, BMI double, DBP double, PG2 double, Age double, SI2 double, prediction string);
from InputStream#ml:predict('registry:/_system/governance/ml/indian-diabetes-model', 'string', 95.0)
select *
insert into PredictionStream;

In the above examples, the path to the from input stream is specified as registry:/_system/governance/ml/indian-diabetes-model. When you specify this path with the registry prefix, WSO2 ML always gets the model from /_system/governance whether this location is specified in the path or not. Therefore, you may alternatively specify this path as registry:/ml/indian-diabetes-model.

Prerequisites

Set up the following prerequisites before starting the configurations.

  1. Download WSO2 ML, and start the server. For instructions, see Getting Started.
  2. Generate a model using WSO2 ML which you will use to make the predictions. For instructions on generating a model in WSO2 ML, see Generating Models.
  3. Download WSO2 CEP, and start the server. For instructions, see Getting Started.

Installing required features in WSO2 CEP

Follow the steps below to install the required features in WSO2 CEP.

  1. Log in to the WSO2 CEP management console using admin/admin credentials and the following URL: https://<CEP_HOME>:<CEP_PORT>/carbon/
  2. Click Configure, and then click Features.
  3. Click Repository Managementand then click Add Repository.
  4. Enter the details as shown below to add the Carbon P2 repository


  5. Click Add.
  6. Click the Available Features tab, and select the repository added in the previous step.
  7. Clear the Group features by category check box.

  8. Click Find Features. It can take a while to list out all the available features in the feature repository. Once they are listed, select the following features. 

    • Machine Learner Core 

    • Machine Learner Commons 

    • Machine Learner Database Service 

    • ML Siddhi Extension

    If you cannot see this feature:

    • Try adding a more recent P2 repository. The repository you added could be deprecated.
    • Check the Installed Features tab to see whether the feature is already installed.

  9. Once the features are selected, click Install to proceed with the installation. 

  10. Click Next, and then select I accept the terms of the license agreement.
  11. Once the installation is completed, click Restart Now, and click Yes in the message which pops up.

    When you are installing ML features to the CEP extension within WSO2 DAS, click Restart Later. Then stop the server and start it again using one of the following commands.

    On Windows:  <PRODUCT_HOME>\bin\wso2server.bat --run -DdisableMLSparkCtx=true 
    On Linux/Solaris/Mac OS: sh <PRODUCT_HOME>/bin/wso2server.sh -DdisableMLSparkCtx=true

When installing ML features in an Apache Storm cluster, it is recommended to use a pom file instead of the Management Console. The Management Control only allows the features to be installed in the default profile instead of in all the profiles of the CEP nodes, and as a result, an exception occurs when events are sent to nodes that do not use the default profile. For more information, see Installing Features using pom Files.

 

When you run WSO2 CEP in a distributed mode, the following needs to be carried out <CEP_HOME>/samples/utils/storm-dependencies.jar/pom.xml

  1. The following dependencies should be uncommented in the <CEP_HOME>/samples/utils/storm-dependencies.jar/pom.xml file as shown below..

    <!-- Uncomment the following depedency section if you want to include Siddhi ML extension as part of
        Storm dependencies -->
    
            <dependency>
                <groupId>org.wso2.carbon.ml</groupId>
                <artifactId>org.wso2.carbon.ml.siddhi.extension</artifactId>
                <version>${carbon.ml.version}</version>
            </dependency>
            <dependency>
                <groupId>org.wso2.carbon.ml</groupId>
                <artifactId>org.wso2.carbon.ml.core</artifactId>
                <version>${carbon.ml.version}</version>
            </dependency>
            <dependency>
                <groupId>org.wso2.carbon.ml</groupId>
                <artifactId>org.wso2.carbon.ml.database</artifactId>
                <version>${carbon.ml.version}</version>
            </dependency>
            <dependency>
                <groupId>org.wso2.carbon.ml</groupId>
                <artifactId>org.wso2.carbon.ml.commons</artifactId>
                <version>${carbon.ml.version}</version>
            </dependency>
            <dependency>
                <groupId>org.wso2.carbon.metrics</groupId>
                <artifactId>org.wso2.carbon.metrics.manager</artifactId>
                <version>${carbon.metrics.version}</version>
            </dependency>
    
            <!--&lt;!&ndash; Dependencies for Spark &ndash;&gt;-->
            <dependency>
                <groupId>org.wso2.orbit.org.apache.spark</groupId>
                <artifactId>spark-core_2.10</artifactId>
                <version>${spark.core.version}</version>
            </dependency>
            <dependency>
                <groupId>org.wso2.orbit.org.apache.spark</groupId>
                <artifactId>spark-sql_2.10</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <dependency>
                <groupId>org.wso2.orbit.org.apache.spark</groupId>
                <artifactId>spark-mllib_2.10</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <dependency>
                <groupId>org.wso2.orbit.org.apache.spark</groupId>
                <artifactId>spark-streaming_2.10</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <dependency>
                <groupId>org.wso2.orbit.org.scalanlp</groupId>
                <artifactId>breeze_2.10</artifactId>
                <version>${breeze.version}</version>
            </dependency>
            <dependency>
                <groupId>org.wso2.orbit.jblas</groupId>
                <artifactId>jblas</artifactId>
                <version>${jblas.version}</version>
            </dependency>
            <dependency>
                <groupId>org.wso2.orbit.spire-math</groupId>
                <artifactId>spire_2.10</artifactId>
                <version>${spire.version}</version>
            </dependency>
            <dependency>
                <groupId>org.wso2.orbit.org.apache.hadoop</groupId>
                <artifactId>hadoop-client</artifactId>
                <version>${hadoop.client.version}</version>
            </dependency>
            <dependency>
                <groupId>org.wso2.uncommons.maths</groupId>
                <artifactId>uncommons-maths</artifactId>
                <version>${uncommons.maths.version}</version>
            </dependency>
            <dependency>
                <groupId>org.wso2.json4s</groupId>
                <artifactId>json4s-jackson_2.10</artifactId>
                <version>${json4s.jackson.version}</version>
            </dependency>
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-api</artifactId>
                <version>${slf4j.version}</version>
            </dependency>
            <dependency>
                <groupId>org.wso2.orbit.github.fommil.netlib</groupId>
                <artifactId>core</artifactId>
                <version>${fommil.netlib.version}</version>
            </dependency>
            <dependency>
                <groupId>org.wso2.orbit.sourceforge.f2j</groupId>
                <artifactId>arpack_combined</artifactId>
                <version>${arpack.combined.version}</version>
            </dependency>
            <dependency>
                <groupId>org.scala-lang</groupId>
                <artifactId>scala-library</artifactId>
                <version>${scala.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-csv</artifactId>
                <version>${commons.csv.version}</version>
            </dependency>
    <!-- ML extension dependencies -->
    
             <include>org.wso2.orbit.org.apache.spark:spark-core_2.10
             </include>
             <include>org.wso2.orbit.org.apache.spark:spark-sql_2.10
             </include>
             <include>org.wso2.orbit.org.apache.spark:spark-mllib_2.10
             </include>
             <include>org.wso2.orbit.org.apache.spark:spark-streaming_2.10
             </include>
             <include>org.wso2.orbit.org.scalanlp:breeze_2.10</include>
             <include>org.wso2.orbit.jblas:jblas</include>
             <include>org.wso2.orbit.spire-math:spire_2.10</include>
             <include>org.wso2.orbit.org.apache.hadoop:hadoop-client
             </include>
             <include>org.wso2.uncommons.maths:uncommons-maths</include>
             <include>org.wso2.json4s:json4s-jackson_2.10</include>
             <include>org.slf4j:slf4j-api</include>
             <include>org.wso2.orbit.github.fommil.netlib:core</include>
             <include>org.wso2.orbit.sourceforge.f2j:arpack_combined
             </include>
             <include>org.scala-lang:scala-library</include>
             <include>org.apache.commons:commons-csv</include>
             <include>org.wso2.carbon.ml:org.wso2.carbon.ml.core</include>
             <include>org.wso2.carbon.ml:org.wso2.carbon.ml.database
             </include>
             <include>org.wso2.carbon.ml:org.wso2.carbon.ml.commons</include>
             <include>
                 org.wso2.carbon.ml:org.wso2.carbon.ml.siddhi.extension
             </include>
             <include>
                 org.wso2.carbon.metrics:org.wso2.carbon.metrics.manager
             </include>


  2. Run the following command from the <CEP_HOME>/samples/utils/storm-dependencies-jar directory.
    mvn clean install 
    This will generate a jar in the target directory.

 

Creating the input stream

Follow the steps below to create the input stream in WSO2 CEP.

  1. Log in to the CEP management console using the following URL, if you are not already logged in: https://<CEP_HOME>:9443/carbon/
  2. Click Main, and then click Event Streams in the Event Processor menu.
  3. Click Add Event Stream.
  4. Enter details of the stream definition that you want to create as shown below.
    add input event stream
  5. Click Add Event Stream, to create the event stream in the system. You view the new  input stream added to the list of all available event streams as shown below.
    event streams list

Creating the output stream

Follow the steps below to create the output stream in WSO2 CEP.

  1. Log in to the CEP management console using the following URL, if you are not already logged in: https://<CEP_HOME>:<CEP_PORT>/carbon/
  2. Click Main, and then click Event Streams in the Event Processor menu.
  3. Click Add Event Stream.
  4. Enter details of the stream definition that you want to create as shown below.
  5. Click Add Event Stream, to create the event stream in the system. You view the new output stream added to the list of all available event streams as shown below.
    output stream in event streams list

Creating the execution plan

Follow the steps below to create the execution plan in WSO2 CEP.

  1. Log in to the CEP management console using the following URL, if you are not already logged in: https://<CEP_HOME>:<CEP_PORT>/carbon/
  2. Click Main, and then click Execution Plans in the Streaming Analytics menu.
  3. Click Add Execution Plan.
  4. Select InputStream:1.0.0 for Import Stream from the list, and click Import.
  5. Enter the name of the output stream (PredictionStream) for Value Of, select PredictionStream:1.0.0 from the Stream ID list, and click Export for Export Stream.

  6. Enter the following Siddhi query as shown below.

    Replace your file or registry path where to locate the downloaded ML model as the first argument and the response variable data-type as the second variable of the predict() function in the below execution plan as described in Siddhi syntax for the extension.

    from InputStream#ml:predict('registry:/_system/governance/ml/indian-diabetes-model', 'double')
    select *
    insert into PredictionStream;

    Click Validate Query Expressions, to validate the query you entered.

  7. Click Add Execution Plan to save the execution plan in the system. You view the execution plan added to the list of all available execution plans as shown below.
    execution plans list

Creating an event publisher

Follow the steps below to create an event publisher in WSO2 CEP.

  1. Log in to the CEP management console using the following URL, if you are not already logged in: https://<CEP_HOME>:<CEP_PORT>/carbon/
  2. Click Main, and then click Publishers in the Event menu.
  3. Click Add Event Publisher.
  4. Enter the details as shown below to create a logger type event publisher.
  5. Click Add Event Publisher, to create the event publisher in the system. You view the new event publisher added to the list of available event publishers. 
  6. Click the corresponding Enable Tracing button as shown below.
    enable event tracing

Simulating events

Follow the steps below to simulate the sending of events in WSO2 CEP.

Queries are collectively processed by all the nodes in a CEP cluster. Therefore, make sure that the ML model is located in the same path in all the nodes. This allows all the nodes to access the model when events are sent to a specific node.

  1. Log in to the CEP management console using the following URL, if you are not already logged in: https://<CEP_HOME>:<CEP_PORT>/carbon/
  2. Click Tools, and then click Event Simulator.
  3. Select InputStream:1.0.0 for the Event Stream Name.
  4. Enter the feature values to predict, in the Stream Attributes input fields as shown below.

  5. Click Send, to send the events. You view the values of the output stream named PredictionStream logged by the event publisher in the back end console logs of WSO2 CEP as shown below.

If you are using a deep learning model to make predictions without H2O runtime, see Using Deep Learning Models without H2O Runtime.

com.atlassian.confluence.content.render.xhtml.migration.exceptions.UnknownMacroMigrationException: The macro 'next_previous_links2' is unknown.