Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Siddhi Aggregate Function consumes zero or more parameters  for each event and output a single attribute having an aggregated results based in the input parameters as an output. This could be used with conjunction with a window in order to find the aggregated results based on the given window like Aggregate Function  Window Extension allows events to be collected and expired without altering the event format based on the given input parameters like the Window operator.

To implement write a custom Aggregate FunctionWindow, create a class extending "org.wso2.siddhi.core.query.selectorprocessor.attributestream.aggregatorwindow.AttributeAggregatorWindowProcessor" and create an appropriate .siddhiext extension mapping file, compile the class, and build the jar containing the .class and .siddhiext files. Add them to the Siddhi class path. In the case of running them on WSO2 DAS add the jar to the <DAS_HOME>/repository/components/dropins directory.

For example,  Aggregate Function extension Window extension created with namespace "custom" and and function name "std" can customWindow can be referred in the query as follows: 

Code Block
languagesql
linenumberstrue
from pizzaOrder#window.length(20)
select TempStream#window.custom:countcustomWindow(orderNo10)
asselect totalOrders*
insert into orderCount;AvgRoomTempStream ;

For the Window extension to be used in a Join Query the Window Extension should be findable. To make the Window findable it should implement the "org.wso2.siddhi.core.query.processor.stream.window.FindableProcessor" Interface. 

Sample implementation of a custom aggregate function extension can window extension can be found below;

Code Block
languagejava
linenumberstrue
/*
 * Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
 *
 * WSO2 Inc. licenses this file to you under the Apache License,
 * Version 2.0 (the "License"); you may not use this file except
 * in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.wso2.siddhi.extension.customAggregateFunctioncustomWindow;

import org.wso2.siddhi.core.config.ExecutionPlanContext;
import org.wso2.siddhi.core.event.ComplexEvent;
import org.wso2.siddhi.core.event.ComplexEventChunk;
import org.wso2.siddhi.core.event.MetaComplexEvent;
import org.wso2.siddhi.core.event.stream.StreamEvent;
import org.wso2.siddhi.core.event.stream.StreamEventCloner;
import org.wso2.siddhi.core.executor.ConstantExpressionExecutor;
import org.wso2.siddhi.core.executor.ExpressionExecutor;
import org.wso2.siddhi.core.executor.VariableExpressionExecutor;
import org.wso2.siddhi.core.query.processor.Processor;
import org.wso2.siddhi.core.query.processor.stream.window.WindowProcessor;
import org.wso2.siddhi.core.query.selector.attribute.aggregator.AttributeAggregator.processor.stream.window.FindableProcessor;
import org.wso2.siddhi.core.table.EventTable;
import org.wso2.siddhi.core.util.collection.operator.Finder;
import org.wso2.siddhi.core.util.parser.CollectionOperatorParser;
import org.wso2.siddhi.query.api.exception.ExecutionPlanValidationException;
import org.wso2.siddhi.query.api.definition.Attribute.expression.Expression;

import java.util.List;
import java.util.Map;

/**
 * Custom Sliding Length CountWindow Extensionimplementation which returns holds last length events, and gets updated on every event countarrival as a longand expiry.
 */
public class CountAggregateFunctionCustomWindow extends WindowProcessor implements AttributeAggregatorFindableProcessor {
    private static Attribute.Type typeint length;
    private int count = Attribute.Type.LONG0;
    private long value = 0lComplexEventChunk<StreamEvent> expiredEventChunk;

    /**
     * The initializationinit method of the WindowProcessor, this method for CountAggregateFunctionwill be called before other methods
     *
     * @param attributeExpressionExecutors are the executors of each attributes in the functionfunction parameters
     * @param executionPlanContext         the Executioncontext of planthe runtimeexecution contextplan
     */
    @Override
    protected void init(ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) {
		//Implement class specific initialization        expiredEventChunk = new ComplexEventChunk<StreamEvent>();
        if (attributeExpressionExecutors.length == 1) {
            length = (Integer) ((ConstantExpressionExecutor) attributeExpressionExecutors[0]).getValue();
        } else {
            throw new ExecutionPlanValidationException("Length window should only have one parameter (<int> windowLength), but found " + attributeExpressionExecutors.length + " input attributes");
        }
    }

    /**
     * The processmain addprocessing method ofthat will thebe CountAggregateFunction,called usedupon whenevent zeroarrival
or one function parameter is provided*
     * @param streamEventChunk  the stream event chunk that need to be processed
     * @param nextProcessor  data null if the next functionprocessor to parameterwhich countthe issuccess zeroevents orneed runtimeto databe valuepassed
of the function parameter  * @param streamEventCloner helps *to @returnclone the count value incoming event for local storage or modification
     */
    @Override
    protected synchronized void process(ComplexEventChunk<StreamEvent> streamEventChunk, Processor nextProcessor, StreamEventCloner streamEventCloner) {
        while (streamEventChunk.hasNext()) {
            StreamEvent streamEvent = streamEventChunk.next();
             public Object processAdd(Object data) {StreamEvent clonedEvent = streamEventCloner.copyStreamEvent(streamEvent);
            clonedEvent.setType(StreamEvent.Type.EXPIRED);
            if (count < length) {
                valuecount++;
        return value        this.expiredEventChunk.add(clonedEvent);
            } else {
    /**            *StreamEvent firstEvent The process add method of the CountAggregateFunction, used when more than one function parameters are provided
     *
= this.expiredEventChunk.poll();
                if (firstEvent != null) {
    * @param data the data values for the function parameters      * @return the count value streamEventChunk.insertBeforeCurrent(firstEvent);
        */     @Override     public Object processAdd(Object[] data)this.expiredEventChunk.add(clonedEvent);
                } else {
        value++;            streamEventChunk.insertBeforeCurrent(clonedEvent);
                }
            }
        return value}
        nextProcessor.process(streamEventChunk);
    }

    /**
     * The process remove method of the CountAggregateFunction, used when zero or one function parameter is provided To find events from the processor event pool, that the matches the matchingEvent based on finder logic.
     *
     * @param matchingEvent the event to be matched with the events at the processor
     * @param data null if the function parameter count is zero or runtime data value of the function parameter finder        the execution element responsible for finding the corresponding events that matches
     *                      the matchingEvent based on pool of events at Processor
     * @return the countmatched valueevents
     */
    @Override
    public Objectsynchronized StreamEvent processRemove(Object datafind(ComplexEvent matchingEvent, Finder finder) {
        value--return finder.find(matchingEvent, expiredEventChunk, streamEventCloner);
    }

    return value;
    }/**
     * To construct a finder having the capability of finding events at the processor that corresponds to the incoming
     /** matchingEvent and the given matching expression logic.
     *
     * The@param processexpression remove method of the CountAggregateFunction, used when more than one function parameters are provided    the matching *expression
     * @param data metaComplexEvent            the datameta valuesstructure forof the function parameters incoming matchingEvent
     * @param executionPlanContext        current execution plan context
     * @return@param variableExpressionExecutors the count value list of variable ExpressionExecutors already created
     */ @param eventTableMap        @Override     public Object processRemove(Object[] data) {
map of event tables
     * @param value--;matchingStreamIndex         returnthe value;stream index of the incoming }matchingEvent
     /*** @param withinTime                  *the Resetmaximum counttime valuegap between the events to be *matched
     * @return reset value finder having the capability of finding events at the processor against the expression and incoming
     * matchingEvent
     */
    @Override
    public ObjectFinder reset() {
        value = 0l;constructFinder(Expression expression, MetaComplexEvent metaComplexEvent, ExecutionPlanContext executionPlanContext, List<VariableExpressionExecutor> variableExpressionExecutors, Map<String, EventTable> eventTableMap, int matchingStreamIndex, long withinTime) {
        return value CollectionOperatorParser.parse(expression, metaComplexEvent, executionPlanContext, variableExpressionExecutors, eventTableMap, matchingStreamIndex, inputDefinition, withinTime);
    }

    /**
     * This will be called only once and this can be used to acquire
     * required resources for the processing element.
     * This will be called after initializing the system and before
     * starting to process the events.
     */
    @Override
    public void start() {
        //Implement start logic to acquire relevant resources
    }

    /**
     * This will be called only once and this can be used to release
     * the acquired resources for processing.
     * This will be called before shutting down the system.
     */
    @Override
    public void stop() {
        //Implement stop logic to release the acquired resources
    }

    /**
     * Used to collect the serializable state of the processing element, that need to be
     * persisted for the reconstructing the element to the same state on a different point of time
     *
     * @return stateful objects of the processing element as an array
     */
    @Override
    public Object[] currentState() {
        return new Object[]{valueexpiredEventChunk, count};
    }

    /**
     * Used to restore serialized state of the processing element, for reconstructing
     * the element to the same state as if was on a previous point of time.
     *
     * @param state the stateful objects of the element as an array on
     *              the same order provided by currentState().
     */
    @Override
    public void restoreState(Object[] state) {
        valueexpiredEventChunk = (LongComplexEventChunk<StreamEvent>) state[0];
    }    count = public Attribute.Type getReturnType(Integer) {
        return typestate[1];
    }
}

Sample custom.siddhiext extension mapping file for the  custom aggregate function window extension can be found below;

Code Block
linenumberstrue
#
# Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
#
# WSO2 Inc. licenses this file to you under the Apache License,
# Version 2.0 (the "License"); you may not use this file except
# in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
std
customWindow=org.wso2.siddhi.coreextension.query.selector.attribute.aggregator.StrandedDeviationAggregateFunctioncustomWindow.CustomWindow