Siddhi Aggregate Function consumes zero or more parameters for each event and output a single attribute having an aggregated results based in the input parameters as an output. This could be used with conjunction with a window in order to find the aggregated results based on the given window like Aggregate Function operator.
To implement a custom Aggregate Function, create a class extending "org.wso2.siddhi.core.query.selector.attribute.aggregator.AttributeAggregator" and create an appropriate .siddhiext extension mapping file, compile the class, and build the jar containing the .class and .siddhiext files. Add them to the Siddhi class path. In the case of running them on WSO2 DAS add the jar to the <DAS_HOME>/repository/components/
libdropins
directory.
For example, Aggregate Function extension created with namespace "custom" and function name "std" can be referred in the query as follows:
Code Block |
---|
language | sql |
---|
linenumbers | true |
---|
|
from StockExchangeStream[price >= 20]
select symbol, custom:std(pricepizzaOrder#window.length(20)
select custom:count(orderNo) as stdPricetotalOrders
insert into StockQuoteorderCount; |
E.g. Implementation can Sample implementation of a custom aggregate function extension can be found below;
Code Block |
---|
language | java |
---|
linenumbers | true |
---|
|
/*
* Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* Licensed WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); * you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
software * *software distributed under the License is distributed on an
* "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. * See the License for the
* specific language governing permissions and limitations
* limitations under the License.
*/
package org.wso2.siddhi.core.query.selector.attribute.aggregatorextension.customAggregateFunction;
import org.wso2.siddhi.core.config.ExecutionPlanContext;
import org.wso2.siddhi.core.exceptionexecutor.OperationNotSupportedExceptionExpressionExecutor;
import org.wso2.siddhi.core.executor.ExpressionExecutorquery.selector.attribute.aggregator.AttributeAggregator;
import org.wso2.siddhi.query.api.definition.Attribute;
/**
* Custom Count Extension which
import java.util.Arrays;
returns event count as a long
*/
public class StrandedDeviationAggregateFunctionCountAggregateFunction extends AttributeAggregator {
private finalstatic Attribute.Type type = Attribute.Type.DOUBLELONG;
private double mean, oldMean, stdDeviation, sum;
private int countlong value = 00l;
/**
* The initialisationinitialization method for FunctionExecutorCountAggregateFunction
*
* @param attributeExpressionExecutors are the executors of each attributes in the function
* @param executionPlanContext Execution plan runtime context
*/
@Override
protected void init(ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) {
//Implement class specific initialization
if}
(attributeExpressionExecutors.length
!= 1) { /**
* The process add method throwof new OperationNotSupportedException("Stddev aggregator has to have exactly 1 parameter, currently " +the CountAggregateFunction, used when zero or one function parameter is provided
*
* @param data null if the function parameter count is attributeExpressionExecutors.lengthzero +or "runtime parameters provided");
data value of the function parameter
} * @return the count }value
*/
@Override public Attribute.Type getReturnType() { return type; }
@Override
public Object processAdd(Object data) {
countvalue++;
doublereturn value;
= (Double) data; }
/**
if (count == 1)* {The process add method of the CountAggregateFunction, used when more than one function sumparameters =are meanprovided
= oldMean = value; *
* @param data the data stdDeviationvalues = 0.0;
for the function parameters
* }@return elsethe {count value
*/
@Override
oldMean = mean; public Object processAdd(Object[] data) {
sum += value++;
mean = sum / countreturn value;
}
/**
stdDeviation += (value - oldMean)*(value -The mean);process remove method of the CountAggregateFunction, used when zero }or one function parameter is provided
if (count*
< 2) { * @param data null if the function parameter count is return 0.0;
}zero or runtime data value of the function parameter
* @return the count value
return Math.sqrt(stdDeviation / count); */
} @Override
public Object processRemove(Object data) {
countvalue--;
doublereturn value = (Double) data;
if (count == 0) {
sum = mean = 0;
stdDeviation = 0;
}
else
{ /**
* The process oldMeanremove =method mean;of the CountAggregateFunction, used when more than one function parameters are provided
sum -= value; *
* @param data the data meanvalues =for sumthe /function count;parameters
stdDeviation -= (value - oldMean)*(value -@return mean);
}
if (count < 2) {
return 0.0;
}
return Math.sqrt(stdDeviation / count);
}
the count value
*/
@Override
public Object resetprocessRemove(Object[] data) {
sum = mean = oldMean = 0.0;
stdDeviation = 0.0;
count = 0value--;
return 0value;
}
@Override/**
public Object processAdd(Object[] data) {* Reset count value
*
return new IllegalStateException("Stddev cannot* process@return datareset array,value
but found " + Arrays.deepToString(data));
} */
@Override
public Object processRemovereset(Object[] data) {
return new IllegalStateException("Stddev cannot process data array, but found " + Arrays.deepToString(data))value = 0l;
return value;
}
/**
* This will be called only once and this can be used to acquire
* required resources for the processing element.
* This will be called after initializing the system and before
* starting to process the events.
*/
@Override
public void start() {
//Implement start logic to acquire relevant resources
}
/**
* This will be called only once and this can be used to release
* the acquired resources for processing.
* This will be called before shutting down the system.
*/
@Override
public void stop() {
//Implement stop logic to release the acquired resources
}
/**
* Used to collect the serialisableserializable state of the processing element, that need to be
* persisted for the reconstructing the element to the same state on a different point of time
*
* @return stateful objects of the processing element as an array
*/
@Override
public Object[] currentState() {
return new Object[] {sum, mean, oldMean, stdDeviation, count{value};
}
/**
* Used to restore serialisedserialized state of the processing element, for reconstructing
* the element to the same state as if was on a previous point of time.
*
* @param state the stateful objects of the element as an array on
* the same order provided by currentState().
*/
@Override
public void restoreState(Object[] state) {
sumvalue = (DoubleLong) state[0];
}
mean = (Double) state[1] public Attribute.Type getReturnType() {
return type;
}
}
oldMean = (Double) state[2];
stdDeviation = (Double) state[3];
count = (Integer) state[4];
}
}
|
Sample custom.siddhiext extension mapping file for the custom aggregate function extension can be found below;
Code Block |
---|
|
#
# Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
#
# WSO2 Inc. licenses this file to you under the Apache License,
# Version 2.0 (the "License"); you may not use this file except
# in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
std=org.wso2.siddhi.core.query.selector.attribute.aggregator.StrandedDeviationAggregateFunction |