The Stream Function Extension allows events to be modified by adding one or more attributes to it. Events can be output upon each event arrival.

To implement a custom stream function, follow the procedure below.

For example, a Stream Function extension created with geo as the namespace and geocode as the function name can be referred in a query as shown below. 

from geocodeStream#geo:geocode(location) 
select symbol, pricegeocodeStream#geo:geocode(location)
select latitude, longitude, formattedAddress
insert into dataOut;

 

The following is a sample implementation of a custom stream function extension. 

 * This extension transforms a location into its geo-coordinates and formatted
 * address
public class GeocodeStreamFunctionProcessor extends StreamFunctionProcessor {

    /**static final Logger LOGGER = Logger.getLogger(GeocodeStreamFunctionProcessor.class);
    private final Geocoder * The init method of the WindowProcessor, this method will be called before other methodsgeocoder = new Geocoder();
    private boolean debugModeOn;

     * The process method of the *GeocodeStreamFunctionProcessor, @paramused attributeExpressionExecutorswhen themore executorsthan of eachone function parameters are provided
@param executionPlanContext    * @param data the data thevalues contextfor of the executionfunction planparameters
     */ @return the data for @Overrideadditional output attributes introduced by protectedthe voidfunction
init(ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) { */
  protected Object[] process(Object[] data) {
        return process(data[0]);
    }
     for (int i =return process(data[0]);
i < attributeExpressionExecutors.length; i++) {}

     * variableExpressionExecutors[i] =(VariableExpressionExecutor) attributeExpressionExecutors[i];

    /*The process method of the GeocodeStreamFunctionProcessor, used when zero or one function parameter is provided
     * @param Thedata mainnull processingif methodthe thatfunction willparameter becount calledis uponzero eventor arrivalruntime data value of the function *parameter
     * @param@return streamEventChunkthe data thefor streamadditional eventoutput chunkattribute thatintroduced needby tothe befunction
processed      */
@param nextProcessor   @Override
 the next processor toprotected which the success events need to be passedObject[] process(Object data) {
        String *location @param streamEventCloner helps to clone the incoming event for local storage or modification
     */= data.toString();

        // Make the geocode request to API library
    @Override    GeocoderRequest protectedgeocoderRequest synchronized= voidnew processGeocoderRequestBuilder(ComplexEventChunk<StreamEvent>)
streamEventChunk, Processor nextProcessor, StreamEventCloner streamEventCloner) {         ComplexEventChunk<StreamEvent> complexEventChunk =.setAddress(location)
new ComplexEventChunk<StreamEvent>();          StreamEvent streamEvent = streamEventChunk.getFirst();   .setLanguage("en")
        while (streamEvent != null) {    .getGeocoderRequest();

       StreamEvent clonedEventdouble = streamEventCloner.copyStreamEvent(streamEvent);latitude, longitude;
        String formattedAddress;
   clonedEvent.setType(StreamEvent.Type.EXPIRED);     try {
       StreamEvent oldEvent = map.put(generateKey(clonedEvent), clonedEvent);   GeocodeResponse geocoderResponse = geocoder.geocode(geocoderRequest);

            if (oldEvent != null!geocoderResponse.getResults().isEmpty()) {
                latitude = complexEventChunk.add(oldEvent);
        }             StreamEvent next = streamEvent.getNext.getLat().doubleValue();
            streamEvent.setNext(null);    longitude = geocoderResponse.getResults().get(0).getGeometry().getLocation()
      complexEventChunk.add(streamEvent);             streamEvent = next;   .getLng().doubleValue();
      }          formattedAddress  nextProcessor.process(complexEventChunk= geocoderResponse.getResults().get(0).getFormattedAddress();
    }      /**  } else {
 * This will be called only once and this can be used to acquire   latitude = -1.0;
* required resources for the processing element.      * This will be calledlongitude after initializing the system and before= -1.0;
         * starting to process the events.  formattedAddress = "N/A";
 */     @Override     public void}
{        } //Do nothing
 catch (IOException e) {
  }      /**    throw new * This will be called only once and this can be used to releaseExecutionPlanRuntimeException("Error in connection to Google Maps API.", e);

* the acquired resources for processing.   if (debugModeOn) {
* This will be called before shutting down the system.   LOGGER.debug("Formatted address: " */+ formattedAddress + ", Location @Overridecoordinates: (" +
  public void stop() {         //Do nothing     }latitude + ", " + longitude /**
+ ")");
    * Used to collect the}
serializable state of the processing element, that need toreturn be
  return new Object[]{formattedAddress, latitude, longitude};
    }
  * persisted for}
reconstructing the element to the/**
same state on a different point* ofThe timeinit method of the GeocodeStreamFunctionProcessor, this *method will be called before other *methods
@return stateful objects of the processing*
element as an array  * @param inputDefinition  */     @Override     public Object[] currentState() {
  the incoming stream definition
     * return@param new Object[]{map};
    }attributeExpressionExecutors the executors of each function parameters
     /** @param executionPlanContext     * Used to restore serializedthe statecontext of the processingexecution element,plan
for reconstructing    *  *@return the elementadditional tooutput theattributes sameintroduced stateby asthe iffunction
was on a previous point of time.
* @param state the statefulprotected objectsList<Attribute> of the element as an array oninit(AbstractDefinition inputDefinition, ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) {
     *   debugModeOn = LOGGER.isDebugEnabled();
        the same order provided by currentState().if (attributeExpressionExecutors[0].getReturnType() != Attribute.Type.STRING) {
       */     @Overridethrow new ExecutionPlanCreationException("First parameter should publicbe void restoreState(Object[] state) {of type string");
 map = (ConcurrentHashMap<String, StreamEvent>) state[0];   ArrayList<Attribute> attributes }= new ArrayList<Attribute>(6);
   /**      * To find events from the processor event pool, that the matches the matchingEvent based on finder logic.
     *attributes.add(new Attribute("formattedAddress", Attribute.Type.STRING));
        attributes.add(new Attribute("latitude", Attribute.Type.DOUBLE));
        attributes.add(new Attribute("longitude", Attribute.Type.DOUBLE));
     * @param matchingEvent thereturn eventattributes;
to be matched with the}
  at the processor/**
     * @paramThis finderwill be called only once and this can thebe executionused elementto responsibleacquire
for finding the corresponding events that* matchesrequired resources for the processing element.
*     * This will be called after initializing the system and before
     * thestarting matchingEventto basedprocess on poolthe of events at Processorevents.
@return the matched events @Override
    */public void start() {
		//Implement start logic to acquire relevant resources
    }
    /** * This will be called only once and this can be used to release
     * Tothe constructacquired aresources finderfor havingprocessing.
the capability of finding events at the processor that corresponds to* This will be called before shutting down the incomingsystem.
matchingEvent and the given matching@Override
expression logic.   public void stop() *{
		//Implement stop logic to release the acquired resources
    }

     * theUsed matchingto expressioncollect the serializable state of the *processing @paramelement, metaComplexEventthat need to be
     * persisted for the reconstructing metathe structureelement ofto the incomingsame matchingEventstate on a different point of *time
@param executionPlanContext    *
   current execution plan* context@return stateful objects of the processing *element as @paraman variableExpressionExecutorsarray
the list of variable ExpressionExecutors already*/
created    @Override
  * @param matchingStreamIndex/**
     * Used to therestore streamserialized indexstate of the incoming matchingEventprocessing element, for reconstructing
     * @paramthe withinTimeelement to the same state as if was on a previous point of time.
    the maximum*
time gap between the events to* be@param matchedstate the stateful objects of the *element @returnas finderan havingarray theon
capability of finding events at the processor against the expression and incoming
     * matchingEvent     *              the same order provided by currentState().
    public void restoreState(Object[] state) {
		//Implement restore state logic.
    }
}
        return CollectionOperatorParser.parse(expression, metaComplexEvent, executionPlanContext, variableExpressionExecutors, eventTableMap, matchingStreamIndex, inputDefinition, withinTime);


    private String generateKey(StreamEvent event) {
        StringBuilder stringBuilder = new StringBuilder();
        for (VariableExpressionExecutor executor : variableExpressionExecutors) {
        return stringBuilder.toString();

Object[] state) {
		//Implement restore state logic.

Sample geo.siddhiext extension mapping file for the custom stream function extension can be found below;

