To write a custom Window, create a class extending "The Siddhi Function Extension consumes zero or more parameters for each event and outputs a single attribute. This could be used to manipulate event attributes to generate new attributes such as the Function operator.
To implement a custom function extension, follow the procedure below.
- Create a class extending
org.wso2.siddhi.core.
...
executor.
...
function.
...
FunctionExecutor
.- Create an appropriate
.siddhiext
...
- extension mapping file
...
- .
- Compile the class
...
- .
- Build the jar containing the
.class
and the.siddhiext
files. - Add
...
- the jar to the Siddhi class path.
...
- If you need to run them on WSO2 DAS, add
...
- it to the
<DAS_HOME>/repository/components/
...
dropins
directory.
For example, Window a custom function extension created with namespace "custom" and function name "lastUnique" math
as the namespace and sin
as the function name can be referred in the a query as follows:shown below.
Code Block | ||||
---|---|---|---|---|
| ||||
from StockExchangeStream[price >= 20]#window.custom:lastUnique(symbol,5) select symbol, price InValueStream select math:sin(inValue) as sinValue insert into StockQuote; |
For the Window extension to be used in a Join Query the Window Extension should be findable. To make the Window findable it should implement the "org.wso2.siddhi.core.query.processor.stream.window.FindableProcessor" Interface.
...
OutMediationStream; |
Info | ||
---|---|---|
| ||
From CEP 3.0.0 onwards, FunctionExecutor is supposed to be used for writing both custom expressions and conditions. |
The following is a sample implementation of a custom function extension.
Code Block | ||||
---|---|---|---|---|
| ||||
/* * Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. * * LicensedWSO2 Inc. licenses this file to you under the Apache License, * Version 2.0 (the "License"); * you may not use this file except * in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * *software distributed under the License is distributed on an * "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. * See the License for the * specific language governing permissions and limitations * limitations under the License. */ package org.wso2.siddhi.core.query.processor.stream.windowextension.math; import org.wso2.siddhi.core.config.ExecutionPlanContext; import org.wso2.siddhi.core.event.ComplexEvent; import org.wso2.siddhi.core.event.ComplexEventChunkexception.ExecutionPlanRuntimeException; import org.wso2.siddhi.core.event.MetaComplexEvent; import org.wso2.siddhi.core.event.stream.StreamEvent; import org.wso2.siddhi.core.event.stream.StreamEventCloner; import org.wso2.siddhi.core.executor.ExpressionExecutor; import org.wso2.siddhi.core.executor.function.VariableExpressionExecutorFunctionExecutor; import org.wso2.siddhi.core.query.processor.Processor; import org.wso2.siddhi.core.table.EventTable; import org.wso2.siddhi.core.util.collection.operator.Finder; import org.wso2.siddhi.core.util.parser.CollectionOperatorParserapi.definition.Attribute; import org.wso2.siddhi.query.api.expressionexception.ExpressionExecutionPlanValidationException; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; public class LastUniqueWindowProcessor extends WindowProcessor implements FindableProcessor{ private ConcurrentHashMap<String, StreamEvent> map = new ConcurrentHashMap<String, StreamEvent>(); private VariableExpressionExecutor[] variableExpressionExecutors;/* * sin(a); * Returns the sine of a (a is in radians). * Accept Type(s) :DOUBLE/INT/FLOAT/LONG * Return Type(s): DOUBLE */ public class SinFunctionExtension extends FunctionExecutor { /** * The initinitialization method offor the WindowProcessorSinFunctionExtension, this method will be called before the other methods * * @param attributeExpressionExecutors the executors of each function parametersparameter * @param executionPlanContext the context of the execution plan */ @Override protected void init(ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) { variableExpressionExecutors = new VariableExpressionExecutor[if (attributeExpressionExecutors.length]; != 1) { for (int i = 0; i < attributeExpressionExecutors.length; i++) { throw new ExecutionPlanValidationException("Invalid no of arguments passed to math:sin() function, " + variableExpressionExecutors[i] =(VariableExpressionExecutor) attributeExpressionExecutors[i]; } } "required 1, but found " /** + attributeExpressionExecutors.length); * The main processing method} that will be called upon event arrival Attribute.Type attributeType = *attributeExpressionExecutors[0].getReturnType(); * @param streamEventChunkif the stream event chunk that need to be processed(!((attributeType == Attribute.Type.DOUBLE) * @param nextProcessor || (attributeType the next processor to which the success events need to be passed== Attribute.Type.INT) *|| @param(attributeType streamEventCloner helps to clone the incoming event for local storage or modification== Attribute.Type.FLOAT) */ || (attributeType == @OverrideAttribute.Type.LONG))) { protected synchronized void process(ComplexEventChunk<StreamEvent> streamEventChunk, Processor nextProcessor, StreamEventCloner streamEventCloner)throw {new ExecutionPlanValidationException("Invalid parameter type found for the argument of ComplexEventChunk<StreamEvent> complexEventChunk = new ComplexEventChunk<StreamEvent>();math:sin() function, " + StreamEvent streamEvent = streamEventChunk.getFirst(); "required " while (streamEvent != null) { + Attribute.Type.INT + " or " + Attribute.Type.LONG + StreamEvent clonedEvent = streamEventCloner.copyStreamEvent(streamEvent); " or " + clonedEventAttribute.setType(StreamEvent.Type.EXPIRED); FLOAT + " or " + Attribute.Type.DOUBLE + StreamEvent oldEvent = map.put(generateKey(clonedEvent), clonedEvent); ", but found " if+ (oldEvent != null) {attributeType.toString()); } } /** complexEventChunk.add(oldEvent); * The main execution method which will be called upon }event arrival * when there are more than StreamEventone nextfunction = streamEvent.getNext();parameter * * streamEvent.setNext(null); @param data the runtime values of function parameters complexEventChunk.add(streamEvent); * @return the function result */ streamEvent =@Override next; protected Object execute(Object[] data) { } nextProcessor.process(complexEventChunk)return null; } /** * This The main execution method which will be called onlyupon onceevent andarrival this can be used to acquire* when there are zero or *one requiredfunction resourcesparameter for the processing element. * * This will* be@param calleddata afternull initializingif the systemfunction andparameter beforecount is zero or * starting to process* the events. */ @Overrideruntime data value of the publicfunction voidparameter start() { * @return the function result //Do nothing */ } @Override /** protected Object execute(Object *data) This{ will be called only once and this can beif used(data to!= releasenull) { * the acquired resources for processing. //type-conversion * This will be called before shutting down the system. if (data instanceof Integer) { */ @Override public void stop() { int inputInt = (Integer) data; //Do nothing } /** return Math.sin((double) inputInt); * Used to collect the serializable state of the processing element, that need} toelse beif (data instanceof Long) { * persisted for the reconstructing the element to the same state on a different point oflong timeinputLong = (Long) data; * * @return stateful objects of the processing element as an arrayreturn Math.sin((double) inputLong); */ @Override } else if public Object[] currentState((data instanceof Float) { return new Object[]{map}; float }inputFloat = (Float) data; /** * Used to restore serialized state of the processing element, for reconstructingreturn Math.sin((double) inputFloat); * the element to the same state} aselse if was(data oninstanceof aDouble) previous{ point of time. * * @param state the stateful objects of the element as an array onreturn Math.sin((Double) data); } * } else { the same order provided by currentState(). throw */ @Override new ExecutionPlanRuntimeException("Input to the math:sin() function cannot be null"); public void restoreState(Object[] state) { } map = (ConcurrentHashMap<String, StreamEvent>) state[0]return null; } /** * To find events from the processor event pool, that the matches the matchingEvent based on finder logic This will be called only once and this can be used to acquire * required resources for the processing element. * This will be called after initializing the *system @paramand matchingEventbefore the event to be matched with* thestarting eventsto atprocess the processorevents. */ @param finder @Override public void thestart() execution{ element responsible for finding the corresponding events that matches//Implement start logic to acquire relevant *resources } /** * This will be called theonly matchingEventonce basedand onthis poolcan ofbe eventsused atto Processorrelease * the @returnacquired theresources matchedfor eventsprocessing. */ This will be called @Overridebefore shutting down the system. public synchronized StreamEvent find(ComplexEvent matchingEvent, Finder*/ finder) { @Override public returnvoid finder.find(matchingEvent, map.values(),streamEventCloner); stop() { } //**Implement stop logic to release the *acquired Toresources construct a finder having the} capability of finding events at the@Override processor that corresponds to thepublic incoming Attribute.Type getReturnType() { * matchingEvent and the given matching expression logic.return Attribute.Type.DOUBLE; } * /** @param expression * Used to collect the serializable state of the processing element, that need to thebe matching expression * persisted *for @paramthe metaComplexEventreconstructing the element to the same state on a different point of thetime meta structure of the incoming matchingEvent* * @param@return executionPlanContextstateful objects of the processing element as an currentarray execution plan context */ * @param variableExpressionExecutors@Override the list of variable ExpressionExecutorspublic already created Object[] currentState() { * @param eventTableMap return null; } /** map of event tables * Used to restore *serialized @paramstate matchingStreamIndexof the processing element, for reconstructing the stream index* ofthe theelement incomingto matchingEventthe same state as if was *on @parama withinTimeprevious point of time. * * @param state the maximumstateful timeobjects gapof betweenthe theelement eventsas toan bearray matchedon * @return finder having the capability of finding events at the processor against the expression and incomingsame order provided by currentState(). */ matchingEvent@Override public */ void restoreState(Object[] state) { @Override public Finder constructFinder(Expression expression, MetaComplexEvent metaComplexEvent, ExecutionPlanContext executionPlanContext, List<VariableExpressionExecutor> variableExpressionExecutors, Map<String, EventTable> eventTableMap, int matchingStreamIndex, long withinTime) { return CollectionOperatorParser.parse(expression, metaComplexEvent, executionPlanContext, variableExpressionExecutors, eventTableMap, matchingStreamIndex, inputDefinition, withinTime); } private String generateKey(StreamEvent event) { StringBuilder stringBuilder = new StringBuilder(); for (VariableExpressionExecutor executor : variableExpressionExecutors) { stringBuilder.append(event.getAttribute(executor.getPosition())); } return stringBuilder.toString(); } } //Implement restore state logic. } } |
Sample math.siddhiext extension mapping file for the custom function extension can be found below;
Code Block |
---|
#
# Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
#
# WSO2 Inc. licenses this file to you under the Apache License,
# Version 2.0 (the "License"); you may not use this file except
# in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
sin=org.wso2.siddhi.extension.math.SinFunctionExtension |