To write a custom function, create a class extending "org.wso2.siddhi.core.executor.function.FunctionExecutor", add the SiddhiExtionsion annotation, compile that class, and add the jar file to the class path <DAS_HOME>/repository/components/lib. Then add the fully-qualified class name for the implementation class in a new line, to the siddhi.extension file located at <DAS_HOME>/repository/conf/siddhi.
For example, if you have created the extension with namespace custom and function name plus, you can use that in the query as follows:
from InMediationStatsStream select meta_host,timestamp,resource_id,direction,fault_count,custom:plus(fault_count,count) as totalCount insert into OutMediationStatsStream;
import org.apache.log4j.Logger; import org.wso2.siddhi.core.config.SiddhiContext; import org.wso2.siddhi.core.exception.QueryCreationException; import org.wso2.siddhi.core.executor.function.FunctionExecutor; import org.wso2.siddhi.query.api.definition.Attribute; import org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension; @SiddhiExtension(namespace = "custom", function = "plus") public class CustomFunctionExtension extends FunctionExecutor { Logger log = Logger.getLogger(CustomFunctionExtension.class); Attribute.Type returnType; /** * Method will be called when initialising the custom function * * @param types * @param siddhiContext */ @Override public void init(Attribute.Type[] types, SiddhiContext siddhiContext) { for (Attribute.Type attributeType : types) { if (attributeType == Attribute.Type.DOUBLE) { returnType = attributeType; break; } else if ((attributeType == Attribute.Type.STRING) || (attributeType == Attribute.Type.BOOL)) { throw new QueryCreationException("Plus cannot have parameters with types String or Bool"); } else { returnType = Attribute.Type.LONG; } } } /** * Method called when sending events to process * * @param obj * @return */ @Override protected Object process(Object obj) { if (returnType == Attribute.Type.DOUBLE) { double total = 0; if (obj instanceof Object[]) { for (Object aObj : (Object[]) obj) { total += Double.parseDouble(String.valueOf(aObj)); } } return total; } else { long total = 0; if (obj instanceof Object[]) { for (Object aObj : (Object[]) obj) { total += Long.parseLong(String.valueOf(aObj)); } } return total; } } @Override public void destroy() { } /** * Return type of the custom function mentioned * * @return */ @Override public Attribute.Type getReturnType() { return returnType; } }
Sample project file can be downloaded from here.