To write a custom WindowSiddhi Aggregate Function consumes zero or more parameters for each event and output a single attribute having an aggregated results based in the input parameters as an output. This could be used with conjunction with a window in order to find the aggregated results based on the given window like Aggregate Function operator.
To implement a custom Aggregate Function, create a class extending "org.wso2.siddhi.core.query.processorselector.streamattribute.windowaggregator.WindowProcessorAttributeAggregator" and create an appropriate .siddhiext extension mapping file, compile the class, and build the jar containing the .class and .siddhiext files. Add them to the Siddhi class path. In the case of running them on WSO2 DAS add the jar to the <DAS_HOME>/repository/components/libdropins directory.
For example, Window extension Aggregate Function extension created with namespace "custom" and function name "lastUniquestd" can be referred in the query as follows:
Code Block | ||||
---|---|---|---|---|
| ||||
from StockExchangeStream[price >= 20]#window.custom:lastUnique(symbol,5) select symbol, pricepizzaOrder#window.length(20) select custom:count(orderNo) as totalOrders insert into StockQuoteorderCount; |
For the Window extension to be used in a Join Query the Window Extension should be findable. To make the Window findable it should implement the "org.wso2.siddhi.core.query.processor.stream.window.FindableProcessor" Interface.
E.g. Implementation can Sample implementation of a custom aggregate function extension can be found below;
Code Block | ||||
---|---|---|---|---|
| ||||
/* * Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. * * Licensed WSO2 Inc. licenses this file to you under the Apache License, * Version 2.0 (the "License"); * you may not use this file except * in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * software distributed under the License is distributed on an * "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. * See the License for the * specific language governing permissions and limitations * limitations under the License. */ package org.wso2.siddhi.core.query.processor.stream.window; extension.customAggregateFunction; import org.wso2.siddhi.core.config.ExecutionPlanContext; import org.wso2.siddhi.core.eventexecutor.ComplexEventExpressionExecutor; import org.wso2.siddhi.core.event.ComplexEventChunk; import org.wso2.siddhi.core.event.MetaComplexEventquery.selector.attribute.aggregator.AttributeAggregator; import org.wso2.siddhi.corequery.eventapi.streamdefinition.StreamEventAttribute; import org.wso2.siddhi.core.event.stream.StreamEventCloner; import org.wso2.siddhi.core.executor.ExpressionExecutor; import org.wso2.siddhi.core.executor.VariableExpressionExecutor; import org.wso2.siddhi.core.query.processor.Processor; import org.wso2.siddhi.core.table.EventTable; import org.wso2.siddhi.core.util.collection.operator.Finder; import org.wso2.siddhi.core.util.parser.CollectionOperatorParser; import org.wso2.siddhi.query.api.expression.Expression; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; public class LastUniqueWindowProcessor extends WindowProcessor implements FindableProcessor /** * Custom Count Extension which returns event count as a long */ public class CountAggregateFunction extends AttributeAggregator { private ConcurrentHashMap<String, StreamEvent> mapstatic Attribute.Type type = new ConcurrentHashMap<String, StreamEvent>();Attribute.Type.LONG; private long privatevalue VariableExpressionExecutor[]= variableExpressionExecutors0l; /** * The initinitialization method of the WindowProcessor, this method will be called before other methods for CountAggregateFunction * * @param attributeExpressionExecutors are the executors of each attributes in the function parameters * @param executionPlanContext theExecution contextplan of the execution planruntime context */ @Override protected void init(ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) { //Implement class specific initialization } /** * The process add method of the CountAggregateFunction, used when zero or one function parameter is provided variableExpressionExecutors = new VariableExpressionExecutor[attributeExpressionExecutors.length]; * * @param data null if the function parameter count is zero or runtime data value of the function parameter for (int* i@return =the 0;count ivalue < attributeExpressionExecutors.length; i++) { */ @Override public variableExpressionExecutors[i] =(VariableExpressionExecutor) attributeExpressionExecutors[i];Object processAdd(Object data) { value++; return }value; } /** * The mainprocess processingadd method that will be called upon event arrival of the CountAggregateFunction, used when more than one function parameters are provided * * @param streamEventChunk the stream event chunk that need to be processed data the data values for the function parameters * @return the count value * @param nextProcessor the next processor to which the success events need to be passed/ @Override public Object processAdd(Object[] data) { value++; * @paramreturn streamEventClonervalue; helps to clone the incoming} event for local storage or modification/** */ The process remove method @Overrideof the CountAggregateFunction, used when protectedzero synchronizedor voidone process(ComplexEventChunk<StreamEvent> streamEventChunk, Processor nextProcessor, StreamEventCloner streamEventCloner) {function parameter is provided * ComplexEventChunk<StreamEvent>* complexEventChunk@param =data new ComplexEventChunk<StreamEvent>(); StreamEvent streamEvent = streamEventChunk.getFirst(); null if the function parameter count is zero or runtime data value of the function parameter * @return whilethe (streamEventcount !=value null) { */ @Override StreamEvent clonedEventpublic =Object streamEventCloner.copyStreamEvent(streamEvent);processRemove(Object data) { value--; clonedEvent.setType(StreamEvent.Type.EXPIRED)return value; } /** StreamEvent oldEvent = map.put(generateKey(clonedEvent), clonedEvent); if (oldEvent != null) { * The process remove method of the CountAggregateFunction, used when more than one function parameters are provided * * @param data the data values for complexEventChunk.add(oldEvent); the function parameters * @return the count value } */ @Override StreamEvent nextpublic =Object streamEvent.getNext();processRemove(Object[] data) { value--; return value; } streamEvent.setNext(null); /** * Reset complexEventChunk.add(streamEvent);count value * * streamEvent@return =reset next;value */ } @Override public Object nextProcessor.processreset(complexEventChunk)) { value = 0l; return value; } /** * This will be called only once and this can be used to acquire * required resources for the processing element. * This will be called after initializing the system and before * starting to process the events. */ @Override public void start() { //Do nothingImplement start logic to acquire relevant resources } /** * This will be called only once and this can be used to release * the acquired resources for processing. * This will be called before shutting down the system. */ @Override public void stop() { //Do nothingImplement stop logic to release the acquired resources } /** * Used to collect the serializable state of the processing element, that need to be * persisted for the reconstructing the element to the same state on a different point of time * * @return stateful objects of the processing element as an array */ @Override public Object[] currentState() { return new Object[]{mapvalue}; } /** * Used to restore serialized state of the processing element, for reconstructing * the element to the same state as if was on a previous point of time. * * @param state the stateful objects of the element as an array on * the same order provided by currentState(). */ @Override public void restoreState(Object[] state) { mapvalue = (ConcurrentHashMap<String, StreamEvent>Long) state[0]; } /** public Attribute.Type getReturnType() { * To find events from the processorreturn eventtype; pool, that the matches the} matchingEvent} based on finder logic. * * @param matchingEvent the event to be matched with the events at the processor * @param finder the execution element responsible for finding the corresponding events that matches * the matchingEvent based on pool of events at Processor * @return the matched events */ @Override public synchronized StreamEvent find(ComplexEvent matchingEvent, Finder finder) { return finder.find(matchingEvent, map.values(),streamEventCloner); } /** * To construct a finder having the capability of finding events at the processor that corresponds to the incoming * matchingEvent and the given matching expression logic. * * @param expression the matching expression * @param metaComplexEvent the meta structure of the incoming matchingEvent * @param executionPlanContext current execution plan context * @param variableExpressionExecutors the list of variable ExpressionExecutors already created * @param eventTableMap map of event tables * @param matchingStreamIndex the stream index of the incoming matchingEvent * @param withinTime the maximum time gap between the events to be matched * @return finder having the capability of finding events at the processor against the expression and incoming * matchingEvent */ @Override public Finder constructFinder(Expression expression, MetaComplexEvent metaComplexEvent, ExecutionPlanContext executionPlanContext, List<VariableExpressionExecutor> variableExpressionExecutors, Map<String, EventTable> eventTableMap, int matchingStreamIndex, long withinTime) { return CollectionOperatorParser.parse(expression, metaComplexEvent, executionPlanContext, variableExpressionExecutors, eventTableMap, matchingStreamIndex, inputDefinition, withinTime); } private String generateKey(StreamEvent event) { StringBuilder stringBuilder = new StringBuilder(); for (VariableExpressionExecutor executor : variableExpressionExecutors) { stringBuilder.append(event.getAttribute(executor.getPosition())); } return stringBuilder.toString(); } } |
Sample custom.siddhiext extension mapping file for the custom aggregate function extension can be found below;
Code Block | ||
---|---|---|
| ||
#
# Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
#
# WSO2 Inc. licenses this file to you under the Apache License,
# Version 2.0 (the "License"); you may not use this file except
# in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
std=org.wso2.siddhi.core.query.selector.attribute.aggregator.StrandedDeviationAggregateFunction |