Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Siddhi  Window Extension allows events to be collected and expired without altering the event format based on the given input parameters like the Window operator.

To write a custom Window, create a class extending "org.wso2.siddhi.core.query.processor.stream.window.WindowProcessor" and create an appropriate .siddhiext extension mapping file, compile the class, and build the jar containing the .class and .siddhiext files. Add them to the Siddhi class path. In the case of running them on WSO2 DAS add the jar to the <DAS_HOME>/repository/components/libdropins directory.

For example, Window extension created with namespace "custom" and and function name "lastUnique" can customWindow can be referred in the query as follows: 

Code Block
languagesql
linenumberstrue
from StockExchangeStream[price >= 20]#windowTempStream#window.custom:lastUniquecustomWindow(symbol,510)

select symbol, price*
insert into AvgRoomTempStream StockQuote;

For the Window extension to be used in a Join Query the Window Extension should be findable. To make the Window findable it should implement the "org.wso2.siddhi.core.query.processor.stream.window.FindableProcessor" Interface. 

E.g. Implementation can Sample implementation of a custom window extension can be found below;

Code Block
languagejava
linenumberstrue
/*
 * Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
 *
 * Licensed WSO2 Inc. licenses this file to you under the Apache License,
 * Version 2.0 (the "License");  * you may not use this file except
 * in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
software * *software distributed under the License is distributed on an
 * "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, * KIND, either express or implied.
 * See the License for the
 * specific language governing permissions and limitations
 * limitations under the License.
 */

package org.wso2.siddhi.core.query.processor.stream.windowextension.customWindow;


import org.wso2.siddhi.core.config.ExecutionPlanContext;
import org.wso2.siddhi.core.event.ComplexEvent;
import org.wso2.siddhi.core.event.ComplexEventChunk;
import org.wso2.siddhi.core.event.MetaComplexEvent;
import org.wso2.siddhi.core.event.stream.StreamEvent;
import org.wso2.siddhi.core.event.stream.StreamEventCloner;
import org.wso2.siddhi.core.executor.ExpressionExecutorConstantExpressionExecutor;
import org.wso2.siddhi.core.executor.VariableExpressionExecutorExpressionExecutor;
import org.wso2.siddhi.core.queryexecutor.processor.ProcessorVariableExpressionExecutor;
import org.wso2.siddhi.core.tablequery.processor.EventTableProcessor;
import org.wso2.siddhi.core.utilquery.processor.collectionstream.operatorwindow.FinderWindowProcessor;
import org.wso2.siddhi.core.utilquery.parser.CollectionOperatorParserprocessor.stream.window.FindableProcessor;
import org.wso2.siddhi.querycore.apitable.expression.ExpressionEventTable;

import javaorg.wso2.siddhi.core.util.collection.operator.ListFinder;
import javaorg.wso2.siddhi.core.util.parser.MapCollectionOperatorParser;
import java.util.concurrent.ConcurrentHashMap;

public class LastUniqueWindowProcessor extends WindowProcessor implements FindableProcessor{
    private ConcurrentHashMap<String, StreamEvent> map = new ConcurrentHashMap<String, StreamEvent>();
    private VariableExpressionExecutor[] variableExpressionExecutors;

    /**
     * The init method of the WindowProcessor, this method will be called before other methods
     *
     * @param attributeExpressionExecutors the executors of each function parameters
     * @param executionPlanContext         the context of the execution planorg.wso2.siddhi.query.api.exception.ExecutionPlanValidationException;
import org.wso2.siddhi.query.api.expression.Expression;

import java.util.List;
import java.util.Map;

/**
 * Custom Sliding Length Window implementation which holds last length events, and gets updated on every event arrival and expiry.
 */
public class CustomWindow extends WindowProcessor implements FindableProcessor {
    private int length;
    private int count = 0;
    private ComplexEventChunk<StreamEvent> expiredEventChunk;

    /**
     * The init method of the WindowProcessor, this method will be called before other methods
     *
     */ @param attributeExpressionExecutors the executors @Overrideof each function parameters
 protected void init(ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext  * @param executionPlanContext) {        the variableExpressionExecutorscontext =of new VariableExpressionExecutor[attributeExpressionExecutors.length];
the execution plan
     */
 for (int i =@Override
0; i < attributeExpressionExecutors.length; i++) {
    protected void init(ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) {
        variableExpressionExecutors[i]expiredEventChunk = new ComplexEventChunk<StreamEvent>(VariableExpressionExecutor) attributeExpressionExecutors[i];
        }if (attributeExpressionExecutors.length == 1) {
}      /**      *length The= main processing method that will be called upon event arrival(Integer) ((ConstantExpressionExecutor) attributeExpressionExecutors[0]).getValue();
        } else *{
     * @param streamEventChunk  the stream event chunkthrow that need to be processed
     * @param nextProcessor     the next processor to which the success events need to be passed
     * @param streamEventCloner helps to clone the incoming event for local storage or modification
     */
    @Override
    protected synchronized void process(ComplexEventChunk<StreamEvent> streamEventChunk, Processor nextProcessor, StreamEventCloner streamEventCloner) {
        ComplexEventChunk<StreamEvent> complexEventChunk = new ComplexEventChunk<StreamEvent>();

        StreamEvent streamEvent = streamEventChunk.getFirst();
        while (streamEvent != null) {
            StreamEvent clonedEvent = streamEventCloner.copyStreamEvent(streamEvent);
            clonedEvent.setType(StreamEvent.Type.EXPIRED);

    new ExecutionPlanValidationException("Length window should only have one parameter (<int> windowLength), but found " + attributeExpressionExecutors.length + " input attributes");
        }
    }

    /**
     * The main processing method that will be called upon event arrival
     *
     * @param streamEventChunk  the stream event chunk that need to be processed
     * @param nextProcessor     the next processor to which the success events need to be passed
     * @param streamEventCloner helps to clone the incoming event for local storage or modification
     */
    @Override
    protected synchronized void process(ComplexEventChunk<StreamEvent> streamEventChunk, Processor nextProcessor, StreamEventCloner streamEventCloner) {
       StreamEvent oldEvent = map.put(generateKey(clonedEvent), clonedEvent);while (streamEventChunk.hasNext()) {
            ifStreamEvent (oldEventstreamEvent != nullstreamEventChunk.next();
{            StreamEvent clonedEvent =   complexEventChunkstreamEventCloner.addcopyStreamEvent(oldEventstreamEvent);
            }clonedEvent.setType(StreamEvent.Type.EXPIRED);
            StreamEventif next(count = streamEvent.getNext();< length) {
                streamEvent.setNext(null);count++;
                complexEventChunkthis.expiredEventChunk.add(streamEventclonedEvent);
            } else streamEvent{
= next;         }      StreamEvent firstEvent = nextProcessorthis.expiredEventChunk.processpoll(complexEventChunk);
    }      /**      *if This(firstEvent will!= benull) called{
only once and this can be used to acquire      * required resources for the processing elementstreamEventChunk.insertBeforeCurrent(firstEvent);
     * This will be called after initializing the system and before      * starting to process the events.this.expiredEventChunk.add(clonedEvent);
           */     @Override} else {
  public void start() {         //Do nothing     }streamEventChunk.insertBeforeCurrent(clonedEvent);
     /**      * This will be called only}
once and this can be used to release     }
* the acquired resources for processing.   }
  * This will be called before shutting down the system. nextProcessor.process(streamEventChunk);
    }
*/
    @Override/**
    public void* stop()To {find events from the processor event pool, that the //Domatches nothingthe matchingEvent based on finder }logic.
     /**
     * Used@param tomatchingEvent collect the serializable state ofevent to be matched with the processingevents element,at thatthe needprocessor
to be    * @param *finder persisted for the reconstructing the element to the execution element sameresponsible statefor onfinding athe differentcorresponding pointevents ofthat timematches
     *      * @return stateful objects of the processing element as an array      */the matchingEvent based on pool @Overrideof events at Processor
 public Object[] currentState() { * @return the matched events
   return new Object[]{map}; */
    }@Override
    public /**synchronized StreamEvent find(ComplexEvent matchingEvent, Finder finder) *{
 Used to restore serialized state of the processing elementreturn finder.find(matchingEvent, for reconstructingexpiredEventChunk, streamEventCloner);
    }
*
the element to the same/**
state as if was on a* previousTo pointconstruct ofa time.finder having the capability of finding *events at the processor that corresponds to the incoming
     * @parammatchingEvent stateand the statefulgiven matching objectsexpression oflogic.
the element as an array on*
     * @param expression            the    same order providedthe by currentState().matching expression
     */ @param metaComplexEvent   @Override     public void restoreState(Object[] state) {the meta structure of the incoming matchingEvent
  map  = (ConcurrentHashMap<String, StreamEvent>) state[0];* @param executionPlanContext     }   current execution plan /**context
     * To@param findvariableExpressionExecutors eventsthe fromlist theof processorvariable eventExpressionExecutors pool,already thatcreated
the  matches the matchingEvent based* on@param findereventTableMap logic.      *      * @param matchingEventmap theof event totables
be matched with the events at* the@param processormatchingStreamIndex      * @param finder the stream index of the incoming matchingEvent
the execution element responsible for finding* the@param correspondingwithinTime events that matches      *         the maximum time gap between the events to be matched
    the matchingEvent based on pool of * @return finder having the capability of finding events at Processor
   the processor against the expression and incoming
  * @return the matched* eventsmatchingEvent
     */
    @Override
    public synchronizedFinder StreamEvent findconstructFinder(ComplexEventExpression matchingEventexpression, Finder finderMetaComplexEvent metaComplexEvent, ExecutionPlanContext executionPlanContext, List<VariableExpressionExecutor> variableExpressionExecutors, Map<String, EventTable> eventTableMap, int matchingStreamIndex, long withinTime) {
        return finderCollectionOperatorParser.find(matchingEvent, map.values(),streamEventClonerparse(expression, metaComplexEvent, executionPlanContext, variableExpressionExecutors, eventTableMap, matchingStreamIndex, inputDefinition, withinTime);
    }

    /**
     * ToThis constructwill abe findercalled havingonly theonce capabilityand ofthis findingcan events at the processor that correspondsbe used to the incomingacquire
     * required matchingEventresources andfor the given matching expression logicprocessing element.
     * This will be called after *initializing @paramthe expressionsystem and before
     * starting to process the events.
    the matching*/
expression    @Override
 * @param metaComplexEvent public void start() {
       the meta//Implement structurestart oflogic theto incomingacquire matchingEventrelevant resources
    *}
@param
executionPlanContext    /**
   current execution plan* contextThis will be called only once *and @paramthis variableExpressionExecutorscan thebe listused ofto variablerelease
ExpressionExecutors already created   * the acquired *resources @paramfor eventTableMapprocessing.
     * This will be called before shutting down the mapsystem.
of event tables   */
  * @param matchingStreamIndex@Override
    public void stop() {
the stream index of   the incoming matchingEvent//Implement stop logic to release the *acquired resources
 @param withinTime  }

    /**
     * Used to collect the serializable state maximumof timethe gapprocessing betweenelement, thethat eventsneed to be
matched     * *persisted @returnfor finderthe havingreconstructing the capabilityelement ofto findingthe eventssame atstate theon processora againstdifferent thepoint expressionof and incomingtime
     * matchingEvent
     */ @return stateful objects of @Overridethe processing element as an publicarray
Finder constructFinder(Expression expression, MetaComplexEvent metaComplexEvent, ExecutionPlanContext*/
executionPlanContext, List<VariableExpressionExecutor> variableExpressionExecutors, Map<String, EventTable> eventTableMap, int matchingStreamIndex, long withinTime) {
        return CollectionOperatorParser.parse(expression, metaComplexEvent, executionPlanContext, variableExpressionExecutors, eventTableMap, matchingStreamIndex, inputDefinition, withinTime);

    }

    private String generateKey(StreamEvent event) {
        StringBuilder stringBuilder = new StringBuilder();
        for (VariableExpressionExecutor executor : variableExpressionExecutors) {
            stringBuilder.append(event.getAttribute(executor.getPosition()));
        }
        return stringBuilder.toString();
    }

}
@Override
    public Object[] currentState() {
        return new Object[]{expiredEventChunk, count};
    }

    /**
     * Used to restore serialized state of the processing element, for reconstructing
     * the element to the same state as if was on a previous point of time.
     *
     * @param state the stateful objects of the element as an array on
     *              the same order provided by currentState().
     */
    @Override
    public void restoreState(Object[] state) {
        expiredEventChunk = (ComplexEventChunk<StreamEvent>) state[0];
        count = (Integer) state[1];
    }
}

Sample custom.siddhiext extension mapping file for the  custom window extension can be found below;

Code Block
linenumberstrue
#
# Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
#
# WSO2 Inc. licenses this file to you under the Apache License,
# Version 2.0 (the "License"); you may not use this file except
# in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

customWindow=org.wso2.siddhi.extension.customWindow.CustomWindow