Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

To write a custom Window, create a class Siddhi Stream Function Extension allows events to be altered by adding one or more attributes to it. Here events can be output upon each event arrival. Implemented by extending "org.wso2.siddhi.core.query.processor.stream.windowfunction.WindowProcessorStreamFunctionProcessor" and create an appropriate .siddhiext extension mapping file.

To implement a custom stream function, create a class extending "org.wso2.siddhi.core.query.processor.stream.function.StreamFunctionProcessor" and create an appropriate .siddhiext extension mapping file, compile the class, and build the jar containing the .class and .siddhiext files. Add them to the Siddhi class path. In the case of running them on WSO2 DAS add the jar to <DAS_HOME>/repository/components/libdropins.

For example, Window extension Stream Function Extension created with namespace "customgeo"  and and function name "lastUniquegeocode"   can be referred in the query as follows: 

Code Block
languagesql
linenumberstrue
from StockExchangeStream[price >= 20]#window.custom:lastUnique(symbol,5) 
select symbol, pricegeocodeStream#geo:geocode(location)
select latitude, longitude, formattedAddress
insert into StockQuotedataOut;

For the Window extension to be used in a Join Query the Window Extension should be findable. To make the Window findable it should implement the "org.wso2.siddhi.core.query.processor.stream.window.FindableProcessor" Interface. 

E.g. Implementation can be found below;

...

Sample implementation of a custom stream function extension can be found below;

Code Block
languagejava
linenumberstrue
/*
 * Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
 *
 * LicensedWSO2 underInc. thelicenses Apachethis License,file Version to you under the Apache License,
 * Version 2.0 (the "License");
 * you may not use this file except
 * in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
software * *software distributed under the License is distributed on an
 * "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.
 * See the License for the
 * specific language governing permissions and limitations
 * limitations under the License.
 */

package org.wso2.siddhi.core.query.processor.stream.windowextension.geo;


import orgcom.wso2google.siddhicode.coregeocoder.config.ExecutionPlanContextGeocoder;
import orgcom.wso2google.siddhicode.coregeocoder.event.ComplexEventGeocoderRequestBuilder;
import orgcom.wso2google.siddhicode.coregeocoder.eventmodel.ComplexEventChunkGeocodeResponse;
import orgcom.wso2google.siddhicode.coregeocoder.eventmodel.MetaComplexEventGeocoderRequest;
import org.wso2apache.siddhi.core.event.stream.StreamEvent;log4j.Logger;
import org.wso2.siddhi.core.eventconfig.stream.StreamEventClonerExecutionPlanContext;
import org.wso2.siddhi.core.executorexception.ExpressionExecutorExecutionPlanCreationException;
import org.wso2.siddhi.core.executorexception.VariableExpressionExecutorExecutionPlanRuntimeException;
import org.wso2.siddhi.core.queryexecutor.processor.ProcessorExpressionExecutor;
import org.wso2.siddhi.core.table.EventTable.query.processor.stream.function.StreamFunctionProcessor;
import org.wso2.siddhi.corequery.utilapi.collectiondefinition.operator.FinderAbstractDefinition;
import org.wso2.siddhi.corequery.utilapi.parserdefinition.CollectionOperatorParserAttribute;

import orgjava.wso2.siddhi.query.api.expression.Expression;
io.IOException;
import java.util.ListArrayList;
import java.util.MapList;
import java.util.concurrent.ConcurrentHashMap;
/**
 public* classThis LastUniqueWindowProcessorextension extendstransforms WindowProcessora implementslocation FindableProcessor{into its geo-coordinates and formatted
private ConcurrentHashMap<String,* StreamEvent>address
map =*/
newpublic ConcurrentHashMap<String, StreamEvent>();
    private VariableExpressionExecutor[] variableExpressionExecutors;

    /**
 class GeocodeStreamFunctionProcessor extends StreamFunctionProcessor {

    private static final Logger LOGGER = Logger.getLogger(GeocodeStreamFunctionProcessor.class);
   * Theprivate initfinal methodGeocoder ofgeocoder the= WindowProcessor, this method will be called before other methodsnew Geocoder();
    private boolean debugModeOn;

    /**
     * The process @parammethod attributeExpressionExecutorsof the executors of each GeocodeStreamFunctionProcessor, used when more than one function parameters are provided
   * @param executionPlanContext*
     * @param data the data contextvalues offor the executionfunction planparameters
     */ @return the data for @Overrideadditional output attributes introduced by protectedthe voidfunction
init(ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) { */
    @Override
  variableExpressionExecutors = newprotected VariableExpressionExecutorObject[attributeExpressionExecutors.length]; process(Object[] data) {
     for (int i =return process(data[0]);
i < attributeExpressionExecutors.length; i++) {}

    /**
     * variableExpressionExecutors[i] =(VariableExpressionExecutor) attributeExpressionExecutors[i];
        }
    }The process method of the GeocodeStreamFunctionProcessor, used when zero or one function parameter is provided
      /**
     * The@param data mainnull processingif methodthe thatfunction willparameter becount calledis uponzero eventor arrivalruntime data value of the function *parameter
     * @param@return streamEventChunkthe data thefor streamadditional eventoutput chunkattribute thatintroduced needby tothe befunction
processed      */
@param nextProcessor   @Override
 the next processor toprotected which the success events need to be passedObject[] process(Object data) {
        String *location @param streamEventCloner helps to clone the incoming event for local storage or modification
     */= data.toString();

        // Make the geocode request to API library
    @Override    GeocoderRequest protectedgeocoderRequest synchronized= voidnew processGeocoderRequestBuilder(ComplexEventChunk<StreamEvent>)
  streamEventChunk, Processor nextProcessor, StreamEventCloner streamEventCloner) {         ComplexEventChunk<StreamEvent>.setAddress(location)
complexEventChunk = new ComplexEventChunk<StreamEvent>();          StreamEvent streamEvent = streamEventChunk.getFirstsetLanguage("en");
        while (streamEvent != null) {    .getGeocoderRequest();

       StreamEvent clonedEventdouble = streamEventCloner.copyStreamEvent(streamEvent);latitude, longitude;
        String formattedAddress;
      clonedEvent.setType(StreamEvent.Type.EXPIRED);
  try {
            StreamEventGeocodeResponse oldEventgeocoderResponse = mapgeocoder.putgeocode(generateKey(clonedEvent), clonedEvent)geocoderRequest);

            if (oldEvent != null!geocoderResponse.getResults().isEmpty()) {
                 complexEventChunk.add(oldEvent);latitude = geocoderResponse.getResults().get(0).getGeometry().getLocation()
            }             StreamEvent next = streamEvent.getNext.getLat().doubleValue();
            streamEvent.setNext(null);    longitude = geocoderResponse.getResults().get(0).getGeometry().getLocation()
            complexEventChunk.add(streamEvent);             streamEvent = next .getLng().doubleValue();
        }        formattedAddress = nextProcessorgeocoderResponse.processgetResults(complexEventChunk);).get(0).getFormattedAddress();
    }      /**  } else {
 * This will be called only once and this can be used to acquire   latitude = -1.0;
* required resources for the processing element.      * This will be calledlongitude after initializing the system and before= -1.0;
       * starting to process the events.    formattedAddress = *"N/A";
    @Override     public void start() {}

       //Do nothing} catch (IOException e) {
}      /**      *throw This will be called only once and this can be used to releasenew ExecutionPlanRuntimeException("Error in connection to Google Maps API.", e);
        }
*
the acquired resources for processing.    if (debugModeOn) *{
This will be called before shutting down the system.    LOGGER.debug("Formatted address: */" + formattedAddress + ", @OverrideLocation coordinates: (" +
 public void stop() {         //Do nothing     } latitude + ", " + /**longitude + ")");
   * Used to collect the serializable}
state of the processing element, that need to bereturn new Object[]{formattedAddress, latitude, longitude};
 * persisted for the}
reconstructing
the element to the same/**
state on a different point of* timeThe init method of the GeocodeStreamFunctionProcessor, *this method will be called before *other @returnmethods
stateful  objects of the processing*
element as an array  * @param inputDefinition  */     @Override     public Object[] currentState() {
  the incoming stream definition
     * return@param new Object[]{map};
    }attributeExpressionExecutors the executors of each function parameters
     /** @param executionPlanContext    * Used to restore serialized the statecontext of the processingexecution element,plan
for reconstructing    *  *@return the elementadditional tooutput theattributes sameintroduced stateby asthe iffunction
was on a previous point of time.*/
    @Override
*    protected List<Attribute> * @param state the stateful objects of the element as an array on
     *init(AbstractDefinition inputDefinition, ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) {
        debugModeOn = LOGGER.isDebugEnabled();
        if (attributeExpressionExecutors[0].getReturnType() != Attribute.Type.STRING) {
 the same order provided by currentState().      */throw new ExecutionPlanCreationException("First parameter should @Overridebe of    public void restoreState(Object[] state) {type string");
        }
map = (ConcurrentHashMap<String, StreamEvent>) state[0];    ArrayList<Attribute> }attributes = new ArrayList<Attribute>(6);
  /**      * To find events from the processor event pool, that the matches the matchingEvent based on finder logic.attributes.add(new Attribute("formattedAddress", Attribute.Type.STRING));
        attributes.add(new Attribute("latitude", Attribute.Type.DOUBLE));
         *
attributes.add(new Attribute("longitude", Attribute.Type.DOUBLE));
    * @param matchingEvent the eventreturn toattributes;
be matched with the events}
at
the processor   /**
  * @param finder * This will be called only once theand executionthis elementcan be responsibleused forto findingacquire
the corresponding events that matches * required resources for the *processing element.
     * This will be called after initializing the system and before
    the matchingEvent* basedstarting onto poolprocess ofthe events at Processor.
     */
@return the matched events @Override
    */public void start() {
		//Implement @Overridestart logic to acquire relevant publicresources
synchronized StreamEvent find(ComplexEvent matchingEvent, Finder}
finder)
{    /**
    return finder.find(matchingEvent, map.values(),streamEventCloner);
    }

    /** * This will be called only once and this can be used to release
     * the Toacquired resources constructfor aprocessing.
finder having the capability of finding* eventsThis atwill thebe processorcalled thatbefore correspondsshutting todown the incomingsystem.
     */
matchingEvent and the given matching@Override
expression logic.   public void stop() *{
		//Implement stop logic to release *the @paramacquired expressionresources
    }

    /**
     * theUsed matchingto expressioncollect the serializable state of the *processing @paramelement, metaComplexEventthat need to be
     * persisted for the meta structurereconstructing of the incomingelement matchingEventto the same state on a *different @parampoint executionPlanContextof time
     *
current execution plan context  * @return stateful objects * @param variableExpressionExecutorsof the listprocessing ofelement variableas ExpressionExecutorsan alreadyarray
created      */
@param eventTableMap   @Override
    public Object[] currentState() {
   map of event tables  return new Object[0];
 * @param matchingStreamIndex }

    /**
 the stream index of the* incomingUsed matchingEventto restore serialized state of the *processing @paramelement, withinTimefor reconstructing
     * the element to the same state as if was on the maximuma previous point of time.
gap between the events to be matched*
     * @return@param finderstate havingthe thestateful capabilityobjects of finding events at the processorelement againstas thean expressionarray and incomingon
     * matchingEvent      */     @Override  the same order publicprovided Finderby constructFinder(Expression expression, MetaComplexEvent metaComplexEvent, ExecutionPlanContext executionPlanContext, List<VariableExpressionExecutor> variableExpressionExecutors, Map<String, EventTable> eventTableMap, int matchingStreamIndex, long withinTime) {
        return CollectionOperatorParser.parse(expression, metaComplexEvent, executionPlanContext, variableExpressionExecutors, eventTableMap, matchingStreamIndex, inputDefinition, withinTime);

    }

    private String generateKey(StreamEvent event) {
        StringBuilder stringBuilder = new StringBuilder();
        for (VariableExpressionExecutor executor : variableExpressionExecutors) {
            stringBuilder.append(event.getAttribute(executor.getPosition()));
        }
        return stringBuilder.toString();
    }

}
currentState().
     */
    @Override
    public void restoreState(Object[] state) {
		//Implement restore state logic.
    }
}

Sample geo.siddhiext extension mapping file for the  custom stream function extension can be found below;

Code Block
linenumberstrue
#
# Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
#
# WSO2 Inc. licenses this file to you under the Apache License,
# Version 2.0 (the "License"); you may not use this file except
# in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

geocode=org.wso2.siddhi.extension.geo.GeocodeStreamFunctionProcessor