Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

To write a custom Window, create a class extending "The Siddhi Function Extension consumes zero or more parameters for each event and outputs a single attribute. This could be used to manipulate event attributes to generate new attributes such as the Function operator. 

To implement a custom function extension, follow the procedure below.

  1. Create a class extending org.wso2.siddhi.core.

...

  1. executor.

...

  1. function.

...

  1. FunctionExecutor.
  2. Create an appropriate .siddhiext

...

  1.  extension mapping file

...

  1. .
  2. Compile the class

...

  1. .
  2. Build the jar containing the .class and the .siddhiext files.
  3. Add

...

  1. the jar to the Siddhi class path.

...

  1. If you need to run them on WSO2 DAS, add

...

  1. it to the <DAS_HOME>/repository/components/

...

  1. dropins directory.

For example, Window a custom function extension created with namespace "custom" and function name "lastUnique"  math as the namespace and sin as the function name can be referred in the a query as follows:shown below. 

Code Block
languagesql
linenumberstrue
from StockExchangeStream[price >= 20]#window.custom:lastUnique(symbol,5) 
select symbol, price
InValueStream
select math:sin(inValue) as sinValue 
insert into StockQuote;

For the Window extension to be used in a Join Query the Window Extension should be findable. To make the Window findable it should implement the "org.wso2.siddhi.core.query.processor.stream.window.FindableProcessor" Interface. 

...

OutMediationStream;
Info
titleNote

From CEP 3.0.0 onwards, FunctionExecutor is supposed to be used for writing both custom expressions and conditions.

The following is a sample implementation of a custom function extension.

Code Block
languagejava
linenumberstrue
/*
 * Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
 *
 * LicensedWSO2 Inc. licenses this file to you under the Apache License,
 * Version 2.0 (the "License");  * you may not use this file except
 * in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
software * *software distributed under the License is distributed on an
 * "AS IS" BASIS,  * WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.
 * See the License for the
 * specific language governing permissions and limitations
 * limitations under the License.
 */

package org.wso2.siddhi.core.query.processor.stream.windowextension.math;


import org.wso2.siddhi.core.config.ExecutionPlanContext;
import org.wso2.siddhi.core.event.ComplexEvent;
import org.wso2.siddhi.core.event.ComplexEventChunkexception.ExecutionPlanRuntimeException;
import org.wso2.siddhi.core.event.MetaComplexEvent;
import org.wso2.siddhi.core.event.stream.StreamEvent;
import org.wso2.siddhi.core.event.stream.StreamEventCloner;
import org.wso2.siddhi.core.executor.ExpressionExecutor;
import org.wso2.siddhi.core.executor.function.VariableExpressionExecutorFunctionExecutor;
import org.wso2.siddhi.core.query.processor.Processor;
import org.wso2.siddhi.core.table.EventTable;
import org.wso2.siddhi.core.util.collection.operator.Finder;
import org.wso2.siddhi.core.util.parser.CollectionOperatorParserapi.definition.Attribute;
import org.wso2.siddhi.query.api.expressionexception.ExpressionExecutionPlanValidationException;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class LastUniqueWindowProcessor extends WindowProcessor implements FindableProcessor{
    private ConcurrentHashMap<String, StreamEvent> map = new ConcurrentHashMap<String, StreamEvent>();
    private VariableExpressionExecutor[] variableExpressionExecutors;/*
* sin(a);
* Returns the sine of a (a is in radians).
* Accept Type(s) :DOUBLE/INT/FLOAT/LONG
* Return Type(s): DOUBLE
*/
public class SinFunctionExtension extends FunctionExecutor {

    /**
     * The initinitialization method offor the WindowProcessorSinFunctionExtension, this method will be called before the other methods
     *
     * @param attributeExpressionExecutors the executors of each function parametersparameter
     * @param executionPlanContext         the context of the execution plan
     */
    @Override
    protected void init(ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) {
        variableExpressionExecutors = new VariableExpressionExecutor[if (attributeExpressionExecutors.length]; != 1) {
     for (int i = 0; i < attributeExpressionExecutors.length; i++) {
        throw new ExecutionPlanValidationException("Invalid no of arguments passed to math:sin() function, " +
   variableExpressionExecutors[i] =(VariableExpressionExecutor) attributeExpressionExecutors[i];         }     } "required 1, but found " /**
+ attributeExpressionExecutors.length);
    * The main processing method}
that will be called upon event arrival  Attribute.Type attributeType =  *attributeExpressionExecutors[0].getReturnType();
      * @param streamEventChunkif  the stream event chunk that need to be processed(!((attributeType == Attribute.Type.DOUBLE)
            * @param nextProcessor  || (attributeType  the next processor to which the success events need to be passed== Attribute.Type.INT)
                *|| @param(attributeType streamEventCloner helps to clone the incoming event for local storage or modification== Attribute.Type.FLOAT)
               */ || (attributeType ==  @OverrideAttribute.Type.LONG))) {
    protected synchronized void process(ComplexEventChunk<StreamEvent> streamEventChunk, Processor nextProcessor, StreamEventCloner streamEventCloner)throw {new ExecutionPlanValidationException("Invalid parameter type found for the argument of ComplexEventChunk<StreamEvent> complexEventChunk = new ComplexEventChunk<StreamEvent>();math:sin() function, " +
          StreamEvent streamEvent = streamEventChunk.getFirst();       "required " while (streamEvent != null) {
   + Attribute.Type.INT + " or " + Attribute.Type.LONG +
        StreamEvent clonedEvent = streamEventCloner.copyStreamEvent(streamEvent);         " or " + clonedEventAttribute.setType(StreamEvent.Type.EXPIRED);

    FLOAT + " or " + Attribute.Type.DOUBLE +
       StreamEvent oldEvent = map.put(generateKey(clonedEvent), clonedEvent);         ", but found " if+ (oldEvent != null) {attributeType.toString());
        }
    }

    /**
  complexEventChunk.add(oldEvent);   * The main execution method which will be called upon }event arrival
     * when there are more than StreamEventone nextfunction = streamEvent.getNext();parameter
     *
     *  streamEvent.setNext(null);
     @param data the runtime values of function parameters
      complexEventChunk.add(streamEvent);
  * @return the function result
     */
   streamEvent =@Override
next;    protected Object execute(Object[] data) {
}         nextProcessor.process(complexEventChunk)return null;
    }

    /**
     * This The main execution method which will be called onlyupon onceevent andarrival
this can be used to acquire* when there are zero or *one requiredfunction resourcesparameter
for the processing element.  *
   * This will* be@param calleddata afternull initializingif the systemfunction andparameter beforecount is zero or
  * starting to process* the events.      */     @Overrideruntime data value of the publicfunction voidparameter
start() {    * @return the function result
//Do nothing    */
}    @Override
 /**   protected Object execute(Object *data) This{
will be called only once and this can beif used(data to!= releasenull) {
    * the acquired resources for processing.   //type-conversion
  * This will be called before shutting down the system. if (data instanceof Integer) {
*/     @Override     public void stop() {   int inputInt = (Integer) data;
 //Do nothing     }      /**   return Math.sin((double) inputInt);
* Used to collect the serializable state of the processing element, that need} toelse beif (data instanceof Long) {
 * persisted for the reconstructing the element to the same state on a different point oflong timeinputLong = (Long) data;
  *      * @return stateful objects of the processing element as an arrayreturn Math.sin((double) inputLong);
     */     @Override  } else if public Object[] currentState((data instanceof Float) {
        return   new Object[]{map};    float }inputFloat = (Float) data;
  /**      * Used to restore serialized state of the processing element, for reconstructingreturn Math.sin((double) inputFloat);
      * the element to the same state} aselse if was(data oninstanceof aDouble) previous{
point of time.      *      * @param state the stateful objects of the element as an array onreturn Math.sin((Double) data);
            }
*        } else {
   the same order provided by currentState().    throw  */
    @Override
 new ExecutionPlanRuntimeException("Input to the math:sin() function cannot be null");
  public void restoreState(Object[] state) {  }
      map = (ConcurrentHashMap<String, StreamEvent>) state[0]return null;
    }

    /**
     * To find events from the processor event pool, that the matches the matchingEvent based on finder logic This will be called only once and this can be used to acquire
     * required resources for the processing element.
     * This will be called after initializing the *system @paramand matchingEventbefore
the event to be matched with* thestarting eventsto atprocess the processorevents.
     */
  @param finder @Override
    public void thestart() execution{
element responsible for finding the corresponding events that matches//Implement start logic to acquire relevant *resources
    }

    /**
     * This will be called theonly matchingEventonce basedand onthis poolcan ofbe eventsused atto Processorrelease
     * the @returnacquired theresources matchedfor eventsprocessing.
     */ This will be called @Overridebefore shutting down the system.
public synchronized StreamEvent find(ComplexEvent matchingEvent, Finder*/
finder) {   @Override
    public returnvoid finder.find(matchingEvent, map.values(),streamEventCloner);
stop() {
   }      //**Implement stop logic to release the *acquired Toresources
construct a finder having the}
capability
of finding events at the@Override
processor that corresponds to thepublic incoming
 Attribute.Type getReturnType() {
   * matchingEvent and the given matching expression logic.return Attribute.Type.DOUBLE;
    }
*

    /**
@param expression    * Used to collect the serializable state of the processing element, that need to thebe
matching expression    * persisted *for @paramthe metaComplexEventreconstructing the element to the same state on a different point of thetime
meta structure of the incoming matchingEvent*
     * @param@return executionPlanContextstateful objects of the processing element as an currentarray
execution plan context   */
  * @param variableExpressionExecutors@Override
the list of variable ExpressionExecutorspublic already created
Object[] currentState() {
    * @param eventTableMap  return null;
    }

    /**
map of event tables  * Used to restore *serialized @paramstate matchingStreamIndexof the processing element, for reconstructing
   the stream index* ofthe theelement incomingto matchingEventthe same state as if was *on @parama withinTimeprevious point of time.
     *
     * @param state the maximumstateful timeobjects gapof betweenthe theelement eventsas toan bearray matchedon
     * @return  finder having the capability of finding events at the processor against the expression and incomingsame order provided by currentState().
     */
    matchingEvent@Override
    public */
 void restoreState(Object[] state) {
  @Override     public Finder constructFinder(Expression expression, MetaComplexEvent metaComplexEvent, ExecutionPlanContext executionPlanContext, List<VariableExpressionExecutor> variableExpressionExecutors, Map<String, EventTable> eventTableMap, int matchingStreamIndex, long withinTime) {
        return CollectionOperatorParser.parse(expression, metaComplexEvent, executionPlanContext, variableExpressionExecutors, eventTableMap, matchingStreamIndex, inputDefinition, withinTime);

    }

    private String generateKey(StreamEvent event) {
        StringBuilder stringBuilder = new StringBuilder();
        for (VariableExpressionExecutor executor : variableExpressionExecutors) {
            stringBuilder.append(event.getAttribute(executor.getPosition()));
        }
        return stringBuilder.toString();
    }

}
//Implement restore state logic.
    }
}

Sample math.siddhiext extension mapping file for the  custom function extension can be found below;

Code Block
#
# Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
#
# WSO2 Inc. licenses this file to you under the Apache License,
# Version 2.0 (the "License"); you may not use this file except
# in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

sin=org.wso2.siddhi.extension.math.SinFunctionExtension