Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

To write Siddhi Function Extension consumes zero or more parameters for each event and output a single attribute as an output. This could be used to manipulate event attributes to generate new attribute like Function operator. 

To implement a custom function extension, create a class extending "org.wso2.siddhi.core.executor.function.FunctionExecutor" , add the SiddhiExtionsion annotation, compile that and create an appropriate .siddhiext extension mapping file, compile the class, and add build the jar file containing the .class and .siddhiext files. Add them to the Siddhi class path <DAS_HOME>/repository/components/lib. Then add the fully-qualified class name for the implementation class in a new line, to the siddhi.extension file located at <DAS. In a scenario where they are run on WSO2 CEP add the jar to the <CEP_HOME>/repository/confcomponents/siddhilib directory.

For example, if you have created the extension with namespace custom and function name plus, you can use that with namespace "math" and function name "sin" can be referred in the query as  as follows: 

Code Block
languagesql
linenumberstrue
from InMediationStatsStreamInValueStream
select meta_host,timestamp,resource_id,direction,fault_count,custom:plus(fault_count,countmath:sin(inValue) as totalCountsinValue 
insert into OutMediationStatsStreamOutMediationStream;
Info
titleNote

From CEP 3.0.0 onwards, FunctionExecutor is supposed to be used for writing both custom expressions and conditions.

Sample implementation of a custom function extension can be found below;

import org.apache.log4j.Logger;
Code Block
languagejava
linenumberstrue
/*
 * Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
 *
 * WSO2 Inc. licenses this file to you under the Apache License,
 * Version 2.0 (the "License"); you may not use this file except
 * in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.wso2.siddhi.extension.math;

import org.wso2.siddhi.core.config.SiddhiContextExecutionPlanContext;
import org.wso2.siddhi.core.exception.QueryCreationException.ExecutionPlanRuntimeException;
import org.wso2.siddhi.core.executor.ExpressionExecutor;
import org.wso2.siddhi.core.executor.function.FunctionExecutor;
import org.wso2.siddhi.query.api.definition.Attribute;
import org.wso2.siddhi.query.api.extensionexception.annotation.SiddhiExtensionExecutionPlanValidationException;

@SiddhiExtension(namespace = "custom", function = "plus")
public class CustomFunctionExtension extends FunctionExecutor {
    Logger log = Logger.getLogger(CustomFunctionExtension.class);
    Attribute.Type returnType;
    
	/*
* sin(a);
* Returns the sine of a (a is in radians).
* Accept Type(s) :DOUBLE/INT/FLOAT/LONG
* Return Type(s): DOUBLE
*/
public class SinFunctionExtension extends FunctionExecutor {

    /**
     * MethodThe initialization method for SinFunctionExtension, this method will be called when initialisingbefore the customother functionmethods
     *
     * @param types attributeExpressionExecutors the executors of each function parameter
     * @param siddhiContext executionPlanContext         the context of the execution plan
     */
    @Override
    publicprotected void init(Attribute.TypeExpressionExecutor[] typesattributeExpressionExecutors, SiddhiContextExecutionPlanContext siddhiContextexecutionPlanContext) {
        forif (AttributeattributeExpressionExecutors.Typelength attributeType : types!= 1) {
            throw new ExecutionPlanValidationException("Invalid no of arguments passed ifto (attributeType == Attribute.Type.DOUBLE) {math:sin() function, " +
                    "required 1, but found " + attributeExpressionExecutors.length);
        }
        Attribute.Type returnTypeattributeType = attributeExpressionExecutors[0].getReturnType();
    attributeType;    if (!((attributeType == Attribute.Type.DOUBLE)
                || (attributeType == Attribute.Type.INT)
 break;             } else if|| ((attributeType == Attribute.Type.STRING) FLOAT)
                || (attributeType == Attribute.Type.BOOLLONG))) {
            throw new ExecutionPlanValidationException("Invalid parameter type found for throwthe argument newof QueryCreationException("Plus cannot have parameters with types String or Bool");
  math:sin() function, " +
                    "required " + Attribute.Type.INT + " or " + Attribute.Type.LONG +
         } else {         " or " + Attribute.Type.FLOAT + " or returnType" =+ Attribute.Type.LONG;DOUBLE +
                    ", but found "  }+ attributeType.toString());
        }
    }

     	/**
     * Method called when sending events to process The main execution method which will be called upon event arrival
     * when there are more than one function parameter
     *
     * @param obj data the runtime values of function parameters
     * @return the function result
     */
    @Override
    protected Object processexecute(Object[] objdata) {
        return null;
    }

    /**
     * The main execution method which will be called upon event arrival
     * when there are zero or one function parameter
     *
     * @param data null if (returnType == Attribute.Type.DOUBLE) { the function parameter count is zero or
     *             runtime data value of the function parameter
     * @return the function result
     */
    @Override
 double total = 0;protected Object execute(Object data) {
        if (objdata instanceof Object[]!= null) {
            //type-conversion
         for (Object aObj :if (Object[]) objdata instanceof Integer) {
                int inputInt = (Integer) data;
              total  +=return DoubleMath.parseDoublesin(String.valueOf(aObjdouble) inputInt);
            } else if (data instanceof Long) {
          }      long inputLong = (Long) data;
   }             return total; Math.sin((double) inputLong);
            } else if (data instanceof Float) {
            long total = 0    float inputFloat = (Float) data;
                return Math.sin((double) inputFloat);
            } else if (objdata instanceof Object[]Double) {
                return Math.sin((Double) data);
            }
        } else {
            throw fornew ExecutionPlanRuntimeException(Object aObj : (Object[]) obj) {
          "Input to the math:sin() function cannot be null");
        }
        return null;
    }

    /**
     * This will be called only once and this can be used to acquire
     * required resources for totalthe +=processing Long.parseLong(String.valueOf(aObj));
element.
     * This will be called after initializing the system and before
     * starting to process the events.
     */
    @Override
    public void start() {
        //Implement start logic to acquire relevant resources
    }

    /**
     * This will be called only once and this can be used to release
  }   * the acquired resources for processing.
     * returnThis total;will be called before shutting down the system.
 }    */
}     	@Override
    public void destroystop() {
        //Implement stop logic to release the acquired resources
    }

    @Override
    public Attribute.Type getReturnType() {
        return Attribute.Type.DOUBLE;
    }


    /**
     * Return type Used to collect the serializable state of the custom function mentioned processing element, that need to be
     * persisted for the reconstructing the element to the same state on a different point of time
     *
     * @return stateful objects of the processing element as an array
     */
     	@Override
    public Attribute.Type getReturnTypeObject[] currentState() {
        return returnTypenull;
    }

    /**
     * Used to restore serialized state of the processing element, for reconstructing
     }

...

* the element to the same state as if was on a previous point of time.
     *
     * @param state the stateful objects of the element as an array on
     *              the same order provided by currentState().
     */
    @Override
    public void restoreState(Object[] state) {
        //Implement restore state logic.
    }
}

Sample math.siddhiext extension mapping file for the  custom function extension can be found below;

Code Block
#
# Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
#
# WSO2 Inc. licenses this file to you under the Apache License,
# Version 2.0 (the "License"); you may not use this file except
# in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

sin=org.wso2.siddhi.extension.math.SinFunctionExtension