Unknown macro: {next_previous_link3}
Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

To implement a custom Aggregate Function, create a class extending "org.wso2.siddhi.core.query.selector.attribute.aggregator.AttributeAggregator" and create an appropriate .siddhiext extension mapping file, compile the class, and build the jar containing the .class and .siddhiext files. Add them to the Siddhi class path. In the case of running them on WSO2 DAS add the jar to <DAS_HOME>/repository/components/lib.

For example, Aggregate Function extension created with namespace "custom" and function name "std" can be referred in the query as follows: 

from StockExchangeStream[price >= 20] 
select symbol, custom:std(price) as stdPrice
insert into StockQuote;

E.g. Implementation can be found below;

/*
 * Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.wso2.siddhi.core.query.selector.attribute.aggregator;

import org.wso2.siddhi.core.config.ExecutionPlanContext;
import org.wso2.siddhi.core.exception.OperationNotSupportedException;
import org.wso2.siddhi.core.executor.ExpressionExecutor;
import org.wso2.siddhi.query.api.definition.Attribute;

import java.util.Arrays;

public class StrandedDeviationAggregateFunction extends AttributeAggregator {

    private final Attribute.Type type = Attribute.Type.DOUBLE;
    private double mean, oldMean, stdDeviation, sum;
    private int count = 0;

    /**
     * The initialisation method for FunctionExecutor
     *
     * @param attributeExpressionExecutors are the executors of each attributes in the function
     * @param executionPlanContext         Execution plan runtime context
     */
    @Override
    protected void init(ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) {
        if (attributeExpressionExecutors.length != 1) {
            throw new OperationNotSupportedException("Stddev aggregator has to have exactly 1 parameter, currently " +
                    attributeExpressionExecutors.length + " parameters provided");
        }
    }
    
    @Override
    public Attribute.Type getReturnType() { return type; }

    @Override
    public Object processAdd(Object data) {
        count++;
        double value = (Double) data;

        if (count == 1) {
            sum = mean = oldMean = value;
            stdDeviation = 0.0;
        } else {
            oldMean = mean;
            sum += value;
            mean = sum / count;
            stdDeviation += (value - oldMean)*(value - mean);
        }

        if (count < 2) {
            return 0.0;
        }
        return Math.sqrt(stdDeviation / count);
    }

    @Override
    public Object processRemove(Object data) {
        count--;
        double value = (Double) data;

        if (count == 0) {
            sum = mean = 0;
            stdDeviation = 0;
        } else {
            oldMean = mean;
            sum -= value;
            mean = sum / count;
            stdDeviation -= (value - oldMean)*(value - mean);
        }

        if (count < 2) {
            return 0.0;
        }
        return Math.sqrt(stdDeviation / count);
    }

    @Override
    public Object reset() {
        sum = mean = oldMean = 0.0;
        stdDeviation = 0.0;
        count = 0;
        return 0;
    }

    @Override
    public Object processAdd(Object[] data) {
        return new IllegalStateException("Stddev cannot process data array, but found " + Arrays.deepToString(data));
    }

    @Override
    public Object processRemove(Object[] data) {
        return new IllegalStateException("Stddev cannot process data array, but found " + Arrays.deepToString(data));
    }

    /**
     * This will be called only once and this can be used to acquire
     * required resources for the processing element.
     * This will be called after initializing the system and before
     * starting to process the events.
     */
    @Override
    public void start() {
    }

    /**
     * This will be called only once and this can be used to release
     * the acquired resources for processing.
     * This will be called before shutting down the system.
     */
    @Override
    public void stop() {
    }

    /**
     * Used to collect the serialisable state of the processing element, that need to be
     * persisted for the reconstructing the element to the same state on a different point of time
     *
     * @return stateful objects of the processing element as an array
     */
    @Override
    public Object[] currentState() {
        return new Object[] {sum, mean, oldMean, stdDeviation, count}; 
    }

    /**
     * Used to restore serialised state of the processing element, for reconstructing
     * the element to the same state as if was on a previous point of time.
     *
     * @param state the stateful objects of the element as an array on
     *              the same order provided by currentState().
     */
    @Override
    public void restoreState(Object[] state) {
        sum = (Double) state[0];
        mean = (Double) state[1];
        oldMean = (Double) state[2];
        stdDeviation = (Double) state[3];
        count = (Integer) state[4];
    }
}

  • No labels