Writing a Custom Window Extension
The Siddhi Window Extension allows events to be collected and expired without altering the event format based on the given input parameters such as the Window operator.
To write a custom window, follow the procedure below.
- Create a class extendingÂ
org.wso2.siddhi.core.query.processor.stream.window.WindowProcessor
. - Create an appropriate
.siddhiext
extension mapping file. - Compile the class.
- Build the jar containing the
.class
and the.siddhiext
files. - Add the jar to the Siddhi class path. If you need to run the extension on WSO2 CEP, add the jar to theÂ
<CEP_HOME>/repository/components/dropins
directory.
For example, a window
extension created with custom
as the namespace and customWindow
as the function name can be referred in a query as shown below.Â
from TempStream#window.custom:customWindow(10) select * insert into AvgRoomTempStream ;
For the window
extension to be used in a join
query, it should be possible to find the window
extension. To enable this, the org.wso2.siddhi.core.query.processor.stream.window.FindableProcessor
interface should be implemented.
The following is a sample implementation of a custom window extension.
/* * Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. * * WSO2 Inc. licenses this file to you under the Apache License, * Version 2.0 (the "License"); you may not use this file except * in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package org.wso2.siddhi.extension.customWindow; import org.wso2.siddhi.core.config.ExecutionPlanContext; import org.wso2.siddhi.core.event.ComplexEvent; import org.wso2.siddhi.core.event.ComplexEventChunk; import org.wso2.siddhi.core.event.MetaComplexEvent; import org.wso2.siddhi.core.event.stream.StreamEvent; import org.wso2.siddhi.core.event.stream.StreamEventCloner; import org.wso2.siddhi.core.executor.ConstantExpressionExecutor; import org.wso2.siddhi.core.executor.ExpressionExecutor; import org.wso2.siddhi.core.executor.VariableExpressionExecutor; import org.wso2.siddhi.core.query.processor.Processor; import org.wso2.siddhi.core.query.processor.stream.window.WindowProcessor; import org.wso2.siddhi.core.query.processor.stream.window.FindableProcessor; import org.wso2.siddhi.core.table.EventTable; import org.wso2.siddhi.core.util.collection.operator.Finder; import org.wso2.siddhi.core.util.parser.CollectionOperatorParser; import org.wso2.siddhi.query.api.exception.ExecutionPlanValidationException; import org.wso2.siddhi.query.api.expression.Expression; import java.util.List; import java.util.Map; /** * Custom Sliding Length Window implementation which holds last length events, and gets updated on every event arrival and expiry. */ public class CustomWindow extends WindowProcessor implements FindableProcessor { private int length; private int count = 0; private ComplexEventChunk<StreamEvent> expiredEventChunk; /** * The init method of the WindowProcessor, this method will be called before other methods * * @param attributeExpressionExecutors the executors of each function parameters * @param executionPlanContext the context of the execution plan */ @Override protected void init(ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) { expiredEventChunk = new ComplexEventChunk<StreamEvent>(); if (attributeExpressionExecutors.length == 1) { length = (Integer) ((ConstantExpressionExecutor) attributeExpressionExecutors[0]).getValue(); } else { throw new ExecutionPlanValidationException("Length window should only have one parameter (<int> windowLength), but found " + attributeExpressionExecutors.length + " input attributes"); } } /** * The main processing method that will be called upon event arrival * * @param streamEventChunk the stream event chunk that need to be processed * @param nextProcessor the next processor to which the success events need to be passed * @param streamEventCloner helps to clone the incoming event for local storage or modification */ @Override protected synchronized void process(ComplexEventChunk<StreamEvent> streamEventChunk, Processor nextProcessor, StreamEventCloner streamEventCloner) { while (streamEventChunk.hasNext()) { StreamEvent streamEvent = streamEventChunk.next(); StreamEvent clonedEvent = streamEventCloner.copyStreamEvent(streamEvent); clonedEvent.setType(StreamEvent.Type.EXPIRED); if (count < length) { count++; this.expiredEventChunk.add(clonedEvent); } else { StreamEvent firstEvent = this.expiredEventChunk.poll(); if (firstEvent != null) { streamEventChunk.insertBeforeCurrent(firstEvent); this.expiredEventChunk.add(clonedEvent); } else { streamEventChunk.insertBeforeCurrent(clonedEvent); } } } nextProcessor.process(streamEventChunk); } /** * To find events from the processor event pool, that the matches the matchingEvent based on finder logic. * * @param matchingEvent the event to be matched with the events at the processor * @param finder the execution element responsible for finding the corresponding events that matches * the matchingEvent based on pool of events at Processor * @return the matched events */ @Override public synchronized StreamEvent find(ComplexEvent matchingEvent, Finder finder) { return finder.find(matchingEvent, expiredEventChunk, streamEventCloner); } /** * To construct a finder having the capability of finding events at the processor that corresponds to the incoming * matchingEvent and the given matching expression logic. * * @param expression the matching expression * @param metaComplexEvent the meta structure of the incoming matchingEvent * @param executionPlanContext current execution plan context * @param variableExpressionExecutors the list of variable ExpressionExecutors already created * @param eventTableMap map of event tables * @param matchingStreamIndex the stream index of the incoming matchingEvent * @param withinTime the maximum time gap between the events to be matched * @return finder having the capability of finding events at the processor against the expression and incoming * matchingEvent */ @Override public Finder constructFinder(Expression expression, MetaComplexEvent metaComplexEvent, ExecutionPlanContext executionPlanContext, List<VariableExpressionExecutor> variableExpressionExecutors, Map<String, EventTable> eventTableMap, int matchingStreamIndex, long withinTime) { return CollectionOperatorParser.parse(expression, metaComplexEvent, executionPlanContext, variableExpressionExecutors, eventTableMap, matchingStreamIndex, inputDefinition, withinTime); } /** * This will be called only once and this can be used to acquire * required resources for the processing element. * This will be called after initializing the system and before * starting to process the events. */ @Override public void start() { //Implement start logic to acquire relevant resources } /** * This will be called only once and this can be used to release * the acquired resources for processing. * This will be called before shutting down the system. */ @Override public void stop() { //Implement stop logic to release the acquired resources } /** * Used to collect the serializable state of the processing element, that need to be * persisted for the reconstructing the element to the same state on a different point of time * * @return stateful objects of the processing element as an array */ @Override public Object[] currentState() { return new Object[]{expiredEventChunk, count}; } /** * Used to restore serialized state of the processing element, for reconstructing * the element to the same state as if was on a previous point of time. * * @param state the stateful objects of the element as an array on * the same order provided by currentState(). */ @Override public void restoreState(Object[] state) { expiredEventChunk = (ComplexEventChunk<StreamEvent>) state[0]; count = (Integer) state[1]; } }
Sample custom.siddhiext extension mapping file for the  custom window extension can be found below;
# # Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. # # WSO2 Inc. licenses this file to you under the Apache License, # Version 2.0 (the "License"); you may not use this file except # in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # customWindow=org.wso2.siddhi.extension.customWindow.CustomWindow