To implement a custom Aggregate Function, create a class extending "org.wso2.siddhi.core.query.selector.attribute.aggregator.AttributeAggregator" and create an appropriate .siddhiext extension mapping file, compile the class, and build the jar containing the .class and .siddhiext files. Add them to the Siddhi class path. In the case of running them on WSO2 DAS add the jar to <DAS_HOME>/repository/components/lib.
For example, Aggregate Function extension created with namespace "custom" and function name "std" can be referred in the query as follows:
from StockExchangeStream[price >= 20] select symbol, custom:std(price) as stdPrice insert into StockQuote;
E.g. Implementation can be found below;
/* * Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.wso2.siddhi.core.query.selector.attribute.aggregator; import org.wso2.siddhi.core.config.ExecutionPlanContext; import org.wso2.siddhi.core.exception.OperationNotSupportedException; import org.wso2.siddhi.core.executor.ExpressionExecutor; import org.wso2.siddhi.query.api.definition.Attribute; import java.util.Arrays; public class StrandedDeviationAggregateFunction extends AttributeAggregator { private final Attribute.Type type = Attribute.Type.DOUBLE; private double mean, oldMean, stdDeviation, sum; private int count = 0; /** * The initialisation method for FunctionExecutor * * @param attributeExpressionExecutors are the executors of each attributes in the function * @param executionPlanContext Execution plan runtime context */ @Override protected void init(ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) { if (attributeExpressionExecutors.length != 1) { throw new OperationNotSupportedException("Stddev aggregator has to have exactly 1 parameter, currently " + attributeExpressionExecutors.length + " parameters provided"); } } @Override public Attribute.Type getReturnType() { return type; } @Override public Object processAdd(Object data) { count++; double value = (Double) data; if (count == 1) { sum = mean = oldMean = value; stdDeviation = 0.0; } else { oldMean = mean; sum += value; mean = sum / count; stdDeviation += (value - oldMean)*(value - mean); } if (count < 2) { return 0.0; } return Math.sqrt(stdDeviation / count); } @Override public Object processRemove(Object data) { count--; double value = (Double) data; if (count == 0) { sum = mean = 0; stdDeviation = 0; } else { oldMean = mean; sum -= value; mean = sum / count; stdDeviation -= (value - oldMean)*(value - mean); } if (count < 2) { return 0.0; } return Math.sqrt(stdDeviation / count); } @Override public Object reset() { sum = mean = oldMean = 0.0; stdDeviation = 0.0; count = 0; return 0; } @Override public Object processAdd(Object[] data) { return new IllegalStateException("Stddev cannot process data array, but found " + Arrays.deepToString(data)); } @Override public Object processRemove(Object[] data) { return new IllegalStateException("Stddev cannot process data array, but found " + Arrays.deepToString(data)); } /** * This will be called only once and this can be used to acquire * required resources for the processing element. * This will be called after initializing the system and before * starting to process the events. */ @Override public void start() { } /** * This will be called only once and this can be used to release * the acquired resources for processing. * This will be called before shutting down the system. */ @Override public void stop() { } /** * Used to collect the serialisable state of the processing element, that need to be * persisted for the reconstructing the element to the same state on a different point of time * * @return stateful objects of the processing element as an array */ @Override public Object[] currentState() { return new Object[] {sum, mean, oldMean, stdDeviation, count}; } /** * Used to restore serialised state of the processing element, for reconstructing * the element to the same state as if was on a previous point of time. * * @param state the stateful objects of the element as an array on * the same order provided by currentState(). */ @Override public void restoreState(Object[] state) { sum = (Double) state[0]; mean = (Double) state[1]; oldMean = (Double) state[2]; stdDeviation = (Double) state[3]; count = (Integer) state[4]; } }