To write The Siddhi Function Extension consumes zero or more parameters for each event and outputs a single attribute. This could be used to manipulate event attributes to generate new attributes such as the Function operator.
To implement a custom function , create extension, follow the procedure below.
- Create a class
...
- extending
org.wso2.siddhi.core.executor.function.FunctionExecutor
...
- .
- Create an appropriate
.siddhiext
extension mapping file. - Compile the class.
- Build the jar containing the
.class
and the.siddhiext
files. - Add the jar to the Siddhi class path. If you need to run the extension on WSO2 DAS, add it to the
<DAS_HOME>/repository/
...
components/
...
dropins
directory.
For example, if you have created the extension with namespace custom and function name plus, you can use that in the query as follows:
...
a custom function extension created with math
as the namespace and sin
as the function name can be referred in a query as shown below.
Code Block | ||||
---|---|---|---|---|
| ||||
from InValueStream select math:sin(inValue) as sinValue insert into OutMediationStatsStreamOutMediationStream; |
Info | |
---|---|
| |
Code Block | |
| |
From CEP 3.0.0 onwards, FunctionExecutor is supposed to be used for writing both custom expressions and conditions. |
The following is a sample implementation of a custom function extension.
Code Block | ||||
---|---|---|---|---|
| ||||
/* * Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. * * WSO2 Inc. licenses this file to you under the Apache License, * Version 2.0 (the "License"); you may not use this file except * in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package org.wso2.siddhi.extension.math; import org.wso2.siddhi.core.config.SiddhiContextExecutionPlanContext; import org.wso2.siddhi.core.exception.QueryCreationExceptionExecutionPlanRuntimeException; import org.wso2.siddhi.core.executor.ExpressionExecutor; import org.wso2.siddhi.core.executor.function.FunctionExecutor; import org.wso2.siddhi.query.api.definition.Attribute; import org.wso2.siddhi.query.api.extensionexception.annotation.SiddhiExtensionExecutionPlanValidationException; @SiddhiExtension(namespace = "custom", function = "plus") public class CustomFunctionExtension extends FunctionExecutor { Logger log = Logger.getLogger(CustomFunctionExtension.class); Attribute.Type returnType; /* * sin(a); * Returns the sine of a (a is in radians). * Accept Type(s) :DOUBLE/INT/FLOAT/LONG * Return Type(s): DOUBLE */ public class SinFunctionExtension extends FunctionExecutor { /** * Method The initialization method for SinFunctionExtension, this method will be called whenbefore initialising the customother functionmethods * * @param types attributeExpressionExecutors the executors of each function parameter * @param siddhiContext executionPlanContext the context of the execution plan */ @Override publicprotected void init(Attribute.TypeExpressionExecutor[] typesattributeExpressionExecutors, SiddhiContextExecutionPlanContext siddhiContextexecutionPlanContext) { forif (AttributeattributeExpressionExecutors.Typelength attributeType : types!= 1) { throw new ExecutionPlanValidationException("Invalid no of arguments passed ifto (attributeType == Attribute.Type.DOUBLE) {math:sin() function, " + "required 1, but found " + attributeExpressionExecutors.length); } returnTypeAttribute.Type attributeType = attributeType;attributeExpressionExecutors[0].getReturnType(); if (!((attributeType == Attribute.Type.DOUBLE) || break;(attributeType == Attribute.Type.INT) } else if|| ((attributeType == Attribute.Type.STRING) FLOAT) || (attributeType == Attribute.Type.BOOLLONG))) { throw new ExecutionPlanValidationException("Invalid parameter type found for throwthe argument newof QueryCreationException("Plus cannot have parameters with types String or Bool"); math:sin() function, " + "required " + Attribute.Type.INT + " or " + Attribute.Type.LONG + } else { " or " + Attribute.Type.FLOAT + " or returnType" =+ Attribute.Type.LONG;DOUBLE + ", }but found " + attributeType.toString()); } } /** * Method The main execution method which will be called upon event arrival * when sending events to process there are more than one function parameter * * @param objdata the runtime values of function parameters * @return the function result */ @Override protected Object processexecute(Object[] objdata) { return null; } /** * The main execution method which will be called upon event arrival * when there are zero or one function parameter * * @param data null if (returnType == Attribute.Type.DOUBLE) { the function parameter count is zero or * runtime data value of the function parameter * @return the function result */ @Override double total = 0;protected Object execute(Object data) { if (objdata instanceof Object[]!= null) { //type-conversion for (Object aObj : (Object[]) obj if (data instanceof Integer) { int inputInt = (Integer) total += Double.parseDouble(String.valueOf(aObj)); data; return Math.sin((double) inputInt); } else if (data instanceof Long) { } long inputLong = (Long) data; } return totalMath.sin((double) inputLong); } else if (data instanceof Float) { longfloat totalinputFloat = 0 (Float) data; return Math.sin((double) inputFloat); } else if (objdata instanceof Object[]Double) { for (Object aObj : (Object[]) obj) {return Math.sin((Double) data); } } else { throw new ExecutionPlanRuntimeException("Input to the math:sin() function cannot be null"); } return null; } /** * This will be called only once and this can be used to acquire total += Long.parseLong(String.valueOf(aObj)); * required resources for the processing element. * This will be called after initializing the system and before * starting to process the events. */ @Override public void start() { //Implement start logic to acquire relevant resources } /** * This will be called only once and this can be used to release * the acquired resources for processing. * This will be called before shutting }down the system. */ @Override public void return total;stop() { //Implement stop }logic to release the acquired resources } @Override public voidAttribute.Type destroygetReturnType() { return Attribute.Type.DOUBLE; } /** * Return type of the custom function mentioned Used to collect the serializable state of the processing element, that need to be * persisted for the reconstructing the element to the same state on a different point of time * * @return stateful objects of the processing element as an array */ @Override public Attribute.Type getReturnTypeObject[] currentState() { return returnTypenull; } /** * Used to restore serialized state of the processing element, for reconstructing * the element to the same state as if was on a previous point of time. * * @param state the stateful objects of the element as an array on * the same order provided by currentState(). */ @Override public void restoreState(Object[] state) { //Implement restore state logic. } } |
Sample project file can be downloaded from here.math.siddhiext extension mapping file for the custom function extension can be found below;
Code Block |
---|
#
# Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
#
# WSO2 Inc. licenses this file to you under the Apache License,
# Version 2.0 (the "License"); you may not use this file except
# in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
sin=org.wso2.siddhi.extension.math.SinFunctionExtension |