To implement a custom function extension, create a class extending "org.wso2.siddhi.core.executor.function.FunctionExecutor" and create an appropriate .siddhiext extension mapping file, compile the class, and build the jar containing the .class
and .siddhiext
files. Add them to the Siddhi class path. In a scenario where they are run on WSO2 DAS add the jar to the <DAS_HOME>/repository/components/dropins
directory.
For example, if you have created the extension with namespace custom and function name plus, you can use that in the query as follows:
from InMediationStatsStream select meta_host,timestamp,resource_id,direction,fault_count,custom:plus(fault_count,count) as totalCount insert into OutMediationStatsStream;
import org.apache.log4j.Logger; import org.wso2.siddhi.core.config.SiddhiContext; import org.wso2.siddhi.core.exception.QueryCreationException; import org.wso2.siddhi.core.executor.function.FunctionExecutor; import org.wso2.siddhi.query.api.definition.Attribute; import org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension; @SiddhiExtension(namespace = "custom", function = "plus") public class CustomFunctionExtension extends FunctionExecutor { Logger log = Logger.getLogger(CustomFunctionExtension.class); Attribute.Type returnType; /** * Method will be called when initialising the custom function * * @param types * @param siddhiContext */ @Override public void init(Attribute.Type[] types, SiddhiContext siddhiContext) { for (Attribute.Type attributeType : types) { if (attributeType == Attribute.Type.DOUBLE) { returnType = attributeType; break; } else if ((attributeType == Attribute.Type.STRING) || (attributeType == Attribute.Type.BOOL)) { throw new QueryCreationException("Plus cannot have parameters with types String or Bool"); } else { returnType = Attribute.Type.LONG; } } } /** * Method called when sending events to process * * @param obj * @return */ @Override protected Object process(Object obj) { if (returnType == Attribute.Type.DOUBLE) { double total = 0; if (obj instanceof Object[]) { for (Object aObj : (Object[]) obj) { total += Double.parseDouble(String.valueOf(aObj)); } } return total; } else { long total = 0; if (obj instanceof Object[]) { for (Object aObj : (Object[]) obj) { total += Long.parseLong(String.valueOf(aObj)); } } return total; } } @Override public void destroy() { } /** * Return type of the custom function mentioned * * @return */ @Override public Attribute.Type getReturnType() return returnType; } }
Sample project file can be downloaded from here.