Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

The Siddhi Function Extension consumes zero or more parameters for each event and outputs a single attribute. This could be used to manipulate event attributes to generate new attributes such as the Function operator. Stream Function Extension allows events to be modified by adding one or more attributes to it. Events can be output upon each event arrival.

To implement a custom stream function extension, follow the procedure below.

  1. Create a class extending org.wso2.siddhi.core.query.processor.executorstream.function.FunctionExecutorStreamFunctionProcessor.
  2. Create an appropriate appropriate .siddhiext extension extension mapping file.
  3. Compile the class.
  4. Build the jar containing the the .class and the .siddhiext files.
  5. Add the jar to the Siddhi class path. If you need to run them the extension on WSO2 DAS, add it the jar to the the <DAS_HOME>/repository/components/dropins directorydropins directory.

For example, a custom function Stream Function extension created with math geo as the namespace and sin geocode as the function name can be referred in a query as shown below. 

Code Block
languagesql
linenumberstrue
from InValueStreamgeocodeStream#geo:geocode(location)
select math:sin(inValue) as sinValue latitude, longitude, formattedAddress
insert into OutMediationStream;
Info
titleNote

From CEP 3.0.0 onwards, FunctionExecutor is supposed to be used for writing both custom expressions and conditions.

...

dataOut;

The following is a sample implementation of a custom stream function extension. 

Code Block
languagejava
linenumberstrue
/*
 * Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
 *
 * WSO2 Inc. licenses this file to you under the Apache License,
 * Version 2.0 (the "License"); you may not use this file except
 * in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.wso2.siddhi.extension.mathgeo;

import orgcom.wso2google.siddhicode.coregeocoder.config.ExecutionPlanContextGeocoder;
import orgcom.wso2google.siddhicode.core.exception.ExecutionPlanRuntimeExceptiongeocoder.GeocoderRequestBuilder;
import com.google.code.geocoder.model.GeocodeResponse;
import org.wso2.siddhi.core.executor.ExpressionExecutorcom.google.code.geocoder.model.GeocoderRequest;
import org.apache.log4j.Logger;
import org.wso2.siddhi.core.executorconfig.function.FunctionExecutorExecutionPlanContext;
import org.wso2.siddhi.querycore.apiexception.definition.AttributeExecutionPlanCreationException;
import org.wso2.siddhi.querycore.api.exception.ExecutionPlanValidationException;

/*
* sin(a);
* Returns the sine of a (a is in radians).
* Accept Type(s) :DOUBLE/INT/FLOAT/LONG
* Return Type(s): DOUBLE
*/
public class SinFunctionExtension extends FunctionExecutor {exception.ExecutionPlanRuntimeException;
import org.wso2.siddhi.core.executor.ExpressionExecutor;
import org.wso2.siddhi.core.query.processor.stream.function.StreamFunctionProcessor;
import org.wso2.siddhi.query.api.definition.AbstractDefinition;
import org.wso2.siddhi.query.api.definition.Attribute;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
 * This extension transforms a location into its geo-coordinates and formatted
 * address
 */
public class GeocodeStreamFunctionProcessor extends StreamFunctionProcessor {

    private static final Logger LOGGER = Logger.getLogger(GeocodeStreamFunctionProcessor.class);
    private final Geocoder geocoder = new Geocoder();
    private boolean debugModeOn;

    /**
     * The process method of the GeocodeStreamFunctionProcessor, used when more than one function parameters are provided
     *
     * @param data the data values for the function parameters
     /** @return the data for additional output attributes introduced *by Thethe initializationfunction
method for SinFunctionExtension, this method will*/
be called before the other@Override
methods    protected  *
 Object[] process(Object[] data) {
   * @param attributeExpressionExecutors the executors of each function parameterreturn process(data[0]);
    }

 * @param executionPlanContext /**
     * The theprocess contextmethod of the GeocodeStreamFunctionProcessor, executionused planwhen zero or one function parameter */is provided
   @Override  *
  protected void init(ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) {
        if (attributeExpressionExecutors.length != 1) {
  * @param data null if the function parameter count is zero or runtime data value of the function parameter
     * @return the data for throwadditional new ExecutionPlanValidationException("Invalid no of arguments passed to math:sin() function, " +
output attribute introduced by the function
     */
    @Override
    protected Object[] process(Object data) {
     "required 1, but foundString "location += attributeExpressionExecutorsdata.lengthtoString();

       } // Make the geocode request to API library
Attribute.Type attributeType = attributeExpressionExecutors[0].getReturnType();     GeocoderRequest geocoderRequest = new if (!((attributeType == Attribute.Type.DOUBLE)
 GeocoderRequestBuilder()
              || (attributeType == Attribute.Type.INT.setAddress(location)
                || (attributeType == Attribute.Type.FLOAT).setLanguage("en")
                .getGeocoderRequest();

 || (attributeType == Attribute.Type.LONG))) {   double latitude, longitude;
       throw new ExecutionPlanValidationException("Invalid parameter type found for the argument of math:sin() function, " +String formattedAddress;
        try {
            GeocodeResponse geocoderResponse = geocoder.geocode(geocoderRequest);

    "required " + Attribute.Type.INT + " or " +if Attribute.Type.LONG +
(!geocoderResponse.getResults().isEmpty()) {
                latitude   " or " + Attribute.Type.FLOAT + " or " + Attribute.Type.DOUBLE += geocoderResponse.getResults().get(0).getGeometry().getLocation()
                        .getLat().doubleValue();
 ", but found " + attributeType.toString());         } longitude = geocoderResponse.getResults().get(0).getGeometry().getLocation()
 }      /**      * The main execution method which will be called upon event arrival .getLng().doubleValue();
     * when there are more than one function parameter   formattedAddress =  *geocoderResponse.getResults().get(0).getFormattedAddress();
      * @param data the runtime values of function} parameterselse {
    * @return the function result      */   latitude = @Override-1.0;
    protected Object execute(Object[] data) {        longitude return null= -1.0;
    }      /**      *formattedAddress The main execution method which will be called upon event arrival= "N/A";
          * when there}
are
zero or one function parameter    } catch *(IOException e) {
   * @param data null if the function parameter count isthrow zero or
     * new ExecutionPlanRuntimeException("Error in connection to Google Maps API.", e);
        }

 runtime data value of the function parameter if (debugModeOn) {
  * @return the function result      */
    @Override
    protected Object execute(Object data) {LOGGER.debug("Formatted address: " + formattedAddress + ", Location coordinates: (" +
            if (data != null) {    latitude + ", " + longitude + ")");
 //type-conversion       }
      if (data instanceofreturn Integer)new Object[]{formattedAddress, latitude, longitude};
    }

    /**
    int inputInt* =The (Integer) data;
          init method of the GeocodeStreamFunctionProcessor, this method will be called before other methods
     return Math.sin((double) inputInt);*
     * @param inputDefinition       } else if (data instanceof Long) { the incoming stream definition
     * @param attributeExpressionExecutors the executors of each longfunction parameters
 inputLong = (Long) data; * @param executionPlanContext         the context of the execution return Math.sin((double) inputLong);plan
     * @return the additional output attributes introduced by the }function
else if (data instanceof Float) {*/
    @Override
    protected List<Attribute> init(AbstractDefinition inputDefinition, ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) {
float inputFloat = (Float) data;    debugModeOn = LOGGER.isDebugEnabled();
          return Math.sin((double) inputFloat);
if (attributeExpressionExecutors[0].getReturnType() != Attribute.Type.STRING) {
           } elsethrow ifnew ExecutionPlanCreationException(data"First instanceofparameter Double)should {be of type string");
         }
   return Math.sin((Double) data);   ArrayList<Attribute> attributes = new ArrayList<Attribute>(6);
     }   attributes.add(new Attribute("formattedAddress", Attribute.Type.STRING));
     } else { attributes.add(new Attribute("latitude", Attribute.Type.DOUBLE));
         throw attributes.add(new ExecutionPlanRuntimeExceptionAttribute("longitude"Input to the math:sin() function cannot be null");
        }, Attribute.Type.DOUBLE));
        return nullattributes;
    }

    /**
     * This will be called only once and this can be used to acquire
     * required resources for the processing element.
     * This will be called after initializing the system and before
     * starting to process the events.
     */
    @Override
    public void start() {
         {
		//Implement start logic to acquire relevant resources
    }

    /**
     * This will be called only once and this can be used to release
     * the acquired resources for processing.
     * This will be called before shutting down the system.
     */
    @Override
    public void stop() {
        //Implement stop logic to release the acquired resources
    }

    @Override
    public Attribute.Type getReturnType void stop() {
		//Implement stop logic to release the   return Attribute.Type.DOUBLE;acquired resources
    }

     /**
     * Used to collect the serializable state of the processing element, that need to be
     * persisted for the reconstructing the element to the same state on a different point of time
     *
     * @return stateful objects of the processing element as an array
     */
    @Override
    public Object[] currentState() {
        return nullnew Object[0];
    }

    /**
     * Used to restore serialized state of the processing element, for reconstructing
     * the element to the same state as if was on a previous point of time.
     *
     * @param state the stateful objects of the element as an array on
     *              the same order provided by currentState().
     */
    @Override
    public void restoreState(Object[] state) {
        		//Implement restore state logic.
    }
}

Sample mathgeo.siddhiext extension mapping file for the  custom stream function extension can be found below;

Code Block
linenumberstrue
#
# Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
#
# WSO2 Inc. licenses this file to you under the Apache License,
# Version 2.0 (the "License"); you may not use this file except
# in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

singeocode=org.wso2.siddhi.extension.mathgeo.SinFunctionExtensionGeocodeStreamFunctionProcessor