Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

To write a custom Window, create a class extending "The Stream Function Extension allows events to be modified by adding one or more attributes to it. Events can be output upon each event arrival.

To implement a custom stream function, follow the procedure below.

  1. Create a class extending org.wso2.siddhi.core.query.processor.stream.function.

...

  1. StreamFunctionProcessor.
  2. Create an appropriate .siddhiext extension mapping file

...

  1. .
  2. Compile the class

...

  1. .
  2. Build the jar containing

...

  1. the .class and the .siddhiext files.
  2. Add

...

  1. the jar to the Siddhi class path.

...

  1. If you need to run the extension on WSO2 DAS, add the jar to the <DAS_HOME>/repository/components/

...

  1. dropins directory.

For example, Window a Stream Function extension created with namespace "custom" and function name "lastUnique"  geo as the namespace and geocode as the function name can be referred in the a query as follows:shown below. 

Code Block
languagesql
linenumberstrue
from StockExchangeStream[price >= 20]#window.custom:lastUnique(symbol,5) 
select symbol, pricegeocodeStream#geo:geocode(location)
select latitude, longitude, formattedAddress
insert into StockQuotedataOut;

For the Window extension to be used in a Join Query the Window Extension should be findable. To make the Window findable it should implement the "org.wso2.siddhi.core.query.processor.stream.window.FindableProcessor" Interface. 

E.g. Implementation can be found below;The following is a sample implementation of a custom stream function extension. 

Code Block
languagejava
linenumberstrue
/*
 * Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
 *
 * LicensedWSO2 underInc. thelicenses Apache License,this file to you under the Apache License,
 * Version 2.0 (the "License");
 * you may not use this file except
 * in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
software * *software distributed under the License is distributed on an
 * "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.
 * See the License for the
 * specific language governing permissions and limitations
 * limitations under the License.
 */

package org.wso2.siddhi.core.query.processor.stream.windowextension.geo;


import org.wso2.siddhi.core.config.ExecutionPlanContextcom.google.code.geocoder.Geocoder;
import com.google.code.geocoder.GeocoderRequestBuilder;
import orgcom.wso2google.siddhicode.coregeocoder.eventmodel.ComplexEventGeocodeResponse;
import orgcom.wso2google.siddhicode.coregeocoder.eventmodel.ComplexEventChunkGeocoderRequest;
import org.wso2apache.siddhi.core.event.MetaComplexEventlog4j.Logger;
import org.wso2.siddhi.core.eventconfig.stream.StreamEventExecutionPlanContext;
import org.wso2.siddhi.core.eventexception.stream.StreamEventClonerExecutionPlanCreationException;
import org.wso2.siddhi.core.executorexception.ExpressionExecutorExecutionPlanRuntimeException;
import org.wso2.siddhi.core.executor.VariableExpressionExecutorExpressionExecutor;
import org.wso2.siddhi.core.query.processor.stream.function.ProcessorStreamFunctionProcessor;
import org.wso2.siddhi.corequery.api.tabledefinition.EventTableAbstractDefinition;
import org.wso2.siddhi.corequery.utilapi.collectiondefinition.operator.FinderAttribute;

import orgjava.wso2.siddhi.core.util.parser.CollectionOperatorParser;
import org.wso2.siddhi.query.api.expression.Expression;
io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class LastUniqueWindowProcessor extends WindowProcessor implements FindableProcessor{
    private ConcurrentHashMap<String, StreamEvent> map = new ConcurrentHashMap<String, StreamEvent>();
/**
 * This extension transforms a location into its geo-coordinates and formatted
 * address
 */
public class GeocodeStreamFunctionProcessor extends StreamFunctionProcessor {

    private VariableExpressionExecutor[] variableExpressionExecutors;

    /**static final Logger LOGGER = Logger.getLogger(GeocodeStreamFunctionProcessor.class);
    private final Geocoder * The init method of the WindowProcessor, this method will be called before other methodsgeocoder = new Geocoder();
    private boolean debugModeOn;

    /**
     * The process method of the *GeocodeStreamFunctionProcessor, @paramused attributeExpressionExecutorswhen themore executorsthan of eachone function parameters are provided
     *
@param executionPlanContext    * @param data the data thevalues contextfor of the executionfunction planparameters
     */ @return the data for @Overrideadditional output attributes introduced by protectedthe voidfunction
init(ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) { */
    @Override
  variableExpressionExecutors = newprotected VariableExpressionExecutorObject[attributeExpressionExecutors.length]; process(Object[] data) {
     for (int i =return process(data[0]);
i < attributeExpressionExecutors.length; i++) {}

    /**
     * variableExpressionExecutors[i] =(VariableExpressionExecutor) attributeExpressionExecutors[i];
        }
    }

    /*The process method of the GeocodeStreamFunctionProcessor, used when zero or one function parameter is provided
     *
     * @param Thedata mainnull processingif methodthe thatfunction willparameter becount calledis uponzero eventor arrivalruntime data value of the function *parameter
     * @param@return streamEventChunkthe data thefor streamadditional eventoutput chunkattribute thatintroduced needby tothe befunction
processed      */
@param nextProcessor   @Override
 the next processor toprotected which the success events need to be passedObject[] process(Object data) {
        String *location @param streamEventCloner helps to clone the incoming event for local storage or modification
     */= data.toString();

        // Make the geocode request to API library
    @Override    GeocoderRequest protectedgeocoderRequest synchronized= voidnew processGeocoderRequestBuilder(ComplexEventChunk<StreamEvent>)
streamEventChunk, Processor nextProcessor, StreamEventCloner streamEventCloner) {         ComplexEventChunk<StreamEvent> complexEventChunk =.setAddress(location)
new ComplexEventChunk<StreamEvent>();          StreamEvent streamEvent = streamEventChunk.getFirst();   .setLanguage("en")
        while (streamEvent != null) {    .getGeocoderRequest();

       StreamEvent clonedEventdouble = streamEventCloner.copyStreamEvent(streamEvent);latitude, longitude;
        String formattedAddress;
   clonedEvent.setType(StreamEvent.Type.EXPIRED);     try {
       StreamEvent oldEvent = map.put(generateKey(clonedEvent), clonedEvent);   GeocodeResponse geocoderResponse = geocoder.geocode(geocoderRequest);

            if (oldEvent != null!geocoderResponse.getResults().isEmpty()) {
                latitude = complexEventChunk.add(oldEvent);
    geocoderResponse.getResults().get(0).getGeometry().getLocation()
        }             StreamEvent next = streamEvent.getNext.getLat().doubleValue();
            streamEvent.setNext(null);    longitude = geocoderResponse.getResults().get(0).getGeometry().getLocation()
      complexEventChunk.add(streamEvent);             streamEvent = next;   .getLng().doubleValue();
      }          formattedAddress  nextProcessor.process(complexEventChunk= geocoderResponse.getResults().get(0).getFormattedAddress();
    }      /**  } else {
 * This will be called only once and this can be used to acquire   latitude = -1.0;
* required resources for the processing element.      * This will be calledlongitude after initializing the system and before= -1.0;
         * starting to process the events.  formattedAddress = "N/A";
 */     @Override     public void}
start()
{        } //Do nothing
 catch (IOException e) {
  }      /**    throw new * This will be called only once and this can be used to releaseExecutionPlanRuntimeException("Error in connection to Google Maps API.", e);
        }

* the acquired resources for processing.   if (debugModeOn) {
* This will be called before shutting down the system.   LOGGER.debug("Formatted address: " */+ formattedAddress + ", Location @Overridecoordinates: (" +
  public void stop() {         //Do nothing     }latitude + ", " + longitude /**
+ ")");
    * Used to collect the}
serializable state of the processing element, that need toreturn be
  new Object[]{formattedAddress, latitude, longitude};
  * persisted for}
the
reconstructing the element to the/**
same state on a different point* ofThe timeinit method of the GeocodeStreamFunctionProcessor, this *method will be called before other *methods
@return stateful objects of the processing*
element as an array  * @param inputDefinition  */     @Override     public Object[] currentState() {
  the incoming stream definition
     * return@param new Object[]{map};
    }attributeExpressionExecutors the executors of each function parameters
     /** @param executionPlanContext     * Used to restore serializedthe statecontext of the processingexecution element,plan
for reconstructing    *  *@return the elementadditional tooutput theattributes sameintroduced stateby asthe iffunction
was on a previous point of time.
     **/
    @Override
* @param state the statefulprotected objectsList<Attribute> of the element as an array oninit(AbstractDefinition inputDefinition, ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) {
     *   debugModeOn = LOGGER.isDebugEnabled();
        the same order provided by currentState().if (attributeExpressionExecutors[0].getReturnType() != Attribute.Type.STRING) {
       */     @Overridethrow new ExecutionPlanCreationException("First parameter should publicbe void restoreState(Object[] state) {of type string");
        }
 map = (ConcurrentHashMap<String, StreamEvent>) state[0];   ArrayList<Attribute> attributes }= new ArrayList<Attribute>(6);
   /**      * To find events from the processor event pool, that the matches the matchingEvent based on finder logic.
     *attributes.add(new Attribute("formattedAddress", Attribute.Type.STRING));
        attributes.add(new Attribute("latitude", Attribute.Type.DOUBLE));
        attributes.add(new Attribute("longitude", Attribute.Type.DOUBLE));
     * @param matchingEvent thereturn eventattributes;
to be matched with the}
events
  at the processor/**
     * @paramThis finderwill be called only once and this can thebe executionused elementto responsibleacquire
for finding the corresponding events that* matchesrequired resources for the processing element.
*     * This will be called after initializing the system and before
     * thestarting matchingEventto basedprocess on poolthe of events at Processorevents.
     */
@return the matched events @Override
    */public void start() {
		//Implement @Overridestart logic to acquire relevant publicresources
synchronized StreamEvent find(ComplexEvent matchingEvent, Finder}
finder)
{    /**
    return finder.find(matchingEvent, map.values(),streamEventCloner);
    }

    /** * This will be called only once and this can be used to release
     * Tothe constructacquired aresources finderfor havingprocessing.
the capability of finding events at the processor that corresponds to* This will be called before shutting down the incomingsystem.
     */
matchingEvent and the given matching@Override
expression logic.   public void stop() *{
		//Implement stop logic to release *the @paramacquired expressionresources
    }

    /**
     * theUsed matchingto expressioncollect the serializable state of the *processing @paramelement, metaComplexEventthat need to be
     * persisted for the reconstructing metathe structureelement ofto the incomingsame matchingEventstate on a different point of *time
@param executionPlanContext    *
   current execution plan* context@return stateful objects of the processing *element as @paraman variableExpressionExecutorsarray
the list of variable ExpressionExecutors already*/
created    @Override
 * @param eventTableMap public Object[] currentState() {
        return new mapObject[0];
of event tables  }

  * @param matchingStreamIndex/**
     * Used to therestore streamserialized indexstate of the incoming matchingEventprocessing element, for reconstructing
     * @paramthe withinTimeelement to the same state as if was on a previous point of time.
    the maximum*
time gap between the events to* be@param matchedstate the stateful objects of the *element @returnas finderan havingarray theon
capability of finding events at the processor against the expression and incoming
     * matchingEvent     *              the same order provided by currentState().
     */
    @Override
    public Findervoid constructFinderrestoreState(Expression expression, MetaComplexEvent metaComplexEvent, ExecutionPlanContext executionPlanContext, List<VariableExpressionExecutor> variableExpressionExecutors, Map<String, EventTable> eventTableMap, int matchingStreamIndex, long withinTime) {
        return CollectionOperatorParser.parse(expression, metaComplexEvent, executionPlanContext, variableExpressionExecutors, eventTableMap, matchingStreamIndex, inputDefinition, withinTime);

    }

    private String generateKey(StreamEvent event) {
        StringBuilder stringBuilder = new StringBuilder();
        for (VariableExpressionExecutor executor : variableExpressionExecutors) {
            stringBuilder.append(event.getAttribute(executor.getPosition()));
        }
        return stringBuilder.toString();
    }

}
Object[] state) {
		//Implement restore state logic.
    }
}

Sample geo.siddhiext extension mapping file for the  custom stream function extension can be found below;

Code Block
linenumberstrue
#
# Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
#
# WSO2 Inc. licenses this file to you under the Apache License,
# Version 2.0 (the "License"); you may not use this file except
# in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

geocode=org.wso2.siddhi.extension.geo.GeocodeStreamFunctionProcessor