To write a custom Window, create a class extending "The Stream Function Extension allows events to be modified by adding one or more attributes to it. Events can be output upon each event arrival.
To implement a custom stream function, follow the procedure below.
- Create a class extending
org.wso2.siddhi.core.query.processor.stream.function.
...
StreamFunctionProcessor
.- Create an appropriate
.siddhiext
extension mapping file
...
- .
- Compile the class
...
- .
- Build the jar containing
...
- the
.class
and the.siddhiext
files. - Add
...
- the jar to the Siddhi class path.
...
- If you need to run the extension on WSO2 DAS, add the jar to the
<DAS_HOME>/repository/components/
...
- dropins directory.
For example, Window a Stream Function extension created with namespace "custom" and function name "lastUnique" geo
as the namespace and geocode
as the function name can be referred in the a query as follows:shown below.
Code Block | ||||
---|---|---|---|---|
| ||||
from StockExchangeStream[price >= 20]#window.custom:lastUnique(symbol,5) select symbol, pricegeocodeStream#geo:geocode(location) select latitude, longitude, formattedAddress insert into StockQuotedataOut; |
For the Window extension to be used in a Join Query the Window Extension should be findable. To make the Window findable it should implement the "org.wso2.siddhi.core.query.processor.stream.window.FindableProcessor" Interface.
E.g. Implementation can be found below;The following is a sample implementation of a custom stream function extension.
Code Block | ||||
---|---|---|---|---|
| ||||
/* * Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. * * LicensedWSO2 underInc. thelicenses Apache License,this file to you under the Apache License, * Version 2.0 (the "License"); * you may not use this file except * in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * *software distributed under the License is distributed on an * "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. * See the License for the * specific language governing permissions and limitations * limitations under the License. */ package org.wso2.siddhi.core.query.processor.stream.windowextension.geo; import org.wso2.siddhi.core.config.ExecutionPlanContextcom.google.code.geocoder.Geocoder; import com.google.code.geocoder.GeocoderRequestBuilder; import orgcom.wso2google.siddhicode.coregeocoder.eventmodel.ComplexEventGeocodeResponse; import orgcom.wso2google.siddhicode.coregeocoder.eventmodel.ComplexEventChunkGeocoderRequest; import org.wso2apache.siddhi.core.event.MetaComplexEventlog4j.Logger; import org.wso2.siddhi.core.eventconfig.stream.StreamEventExecutionPlanContext; import org.wso2.siddhi.core.eventexception.stream.StreamEventClonerExecutionPlanCreationException; import org.wso2.siddhi.core.executorexception.ExpressionExecutorExecutionPlanRuntimeException; import org.wso2.siddhi.core.executor.VariableExpressionExecutorExpressionExecutor; import org.wso2.siddhi.core.query.processor.stream.function.ProcessorStreamFunctionProcessor; import org.wso2.siddhi.corequery.api.tabledefinition.EventTableAbstractDefinition; import org.wso2.siddhi.corequery.utilapi.collectiondefinition.operator.FinderAttribute; import orgjava.wso2.siddhi.core.util.parser.CollectionOperatorParser; import org.wso2.siddhi.query.api.expression.Expression; io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; public class LastUniqueWindowProcessor extends WindowProcessor implements FindableProcessor{ private ConcurrentHashMap<String, StreamEvent> map = new ConcurrentHashMap<String, StreamEvent>(); /** * This extension transforms a location into its geo-coordinates and formatted * address */ public class GeocodeStreamFunctionProcessor extends StreamFunctionProcessor { private VariableExpressionExecutor[] variableExpressionExecutors; /**static final Logger LOGGER = Logger.getLogger(GeocodeStreamFunctionProcessor.class); private final Geocoder * The init method of the WindowProcessor, this method will be called before other methodsgeocoder = new Geocoder(); private boolean debugModeOn; /** * The process method of the *GeocodeStreamFunctionProcessor, @paramused attributeExpressionExecutorswhen themore executorsthan of eachone function parameters are provided * @param executionPlanContext * @param data the data thevalues contextfor of the executionfunction planparameters */ @return the data for @Overrideadditional output attributes introduced by protectedthe voidfunction init(ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) { */ @Override variableExpressionExecutors = newprotected VariableExpressionExecutorObject[attributeExpressionExecutors.length]; process(Object[] data) { for (int i =return process(data[0]); i < attributeExpressionExecutors.length; i++) {} /** * variableExpressionExecutors[i] =(VariableExpressionExecutor) attributeExpressionExecutors[i]; } } /*The process method of the GeocodeStreamFunctionProcessor, used when zero or one function parameter is provided * * @param Thedata mainnull processingif methodthe thatfunction willparameter becount calledis uponzero eventor arrivalruntime data value of the function *parameter * @param@return streamEventChunkthe data thefor streamadditional eventoutput chunkattribute thatintroduced needby tothe befunction processed */ @param nextProcessor @Override the next processor toprotected which the success events need to be passedObject[] process(Object data) { String *location @param streamEventCloner helps to clone the incoming event for local storage or modification */= data.toString(); // Make the geocode request to API library @Override GeocoderRequest protectedgeocoderRequest synchronized= voidnew processGeocoderRequestBuilder(ComplexEventChunk<StreamEvent>) streamEventChunk, Processor nextProcessor, StreamEventCloner streamEventCloner) { ComplexEventChunk<StreamEvent> complexEventChunk =.setAddress(location) new ComplexEventChunk<StreamEvent>(); StreamEvent streamEvent = streamEventChunk.getFirst(); .setLanguage("en") while (streamEvent != null) { .getGeocoderRequest(); StreamEvent clonedEventdouble = streamEventCloner.copyStreamEvent(streamEvent);latitude, longitude; String formattedAddress; clonedEvent.setType(StreamEvent.Type.EXPIRED); try { StreamEvent oldEvent = map.put(generateKey(clonedEvent), clonedEvent); GeocodeResponse geocoderResponse = geocoder.geocode(geocoderRequest); if (oldEvent != null!geocoderResponse.getResults().isEmpty()) { latitude = complexEventChunk.add(oldEvent); geocoderResponse.getResults().get(0).getGeometry().getLocation() } StreamEvent next = streamEvent.getNext.getLat().doubleValue(); streamEvent.setNext(null); longitude = geocoderResponse.getResults().get(0).getGeometry().getLocation() complexEventChunk.add(streamEvent); streamEvent = next; .getLng().doubleValue(); } formattedAddress nextProcessor.process(complexEventChunk= geocoderResponse.getResults().get(0).getFormattedAddress(); } /** } else { * This will be called only once and this can be used to acquire latitude = -1.0; * required resources for the processing element. * This will be calledlongitude after initializing the system and before= -1.0; * starting to process the events. formattedAddress = "N/A"; */ @Override public void} start() { } //Do nothing catch (IOException e) { } /** throw new * This will be called only once and this can be used to releaseExecutionPlanRuntimeException("Error in connection to Google Maps API.", e); } * the acquired resources for processing. if (debugModeOn) { * This will be called before shutting down the system. LOGGER.debug("Formatted address: " */+ formattedAddress + ", Location @Overridecoordinates: (" + public void stop() { //Do nothing }latitude + ", " + longitude /** + ")"); * Used to collect the} serializable state of the processing element, that need toreturn be new Object[]{formattedAddress, latitude, longitude}; * persisted for} the reconstructing the element to the/** same state on a different point* ofThe timeinit method of the GeocodeStreamFunctionProcessor, this *method will be called before other *methods @return stateful objects of the processing* element as an array * @param inputDefinition */ @Override public Object[] currentState() { the incoming stream definition * return@param new Object[]{map}; }attributeExpressionExecutors the executors of each function parameters /** @param executionPlanContext * Used to restore serializedthe statecontext of the processingexecution element,plan for reconstructing * *@return the elementadditional tooutput theattributes sameintroduced stateby asthe iffunction was on a previous point of time. **/ @Override * @param state the statefulprotected objectsList<Attribute> of the element as an array oninit(AbstractDefinition inputDefinition, ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) { * debugModeOn = LOGGER.isDebugEnabled(); the same order provided by currentState().if (attributeExpressionExecutors[0].getReturnType() != Attribute.Type.STRING) { */ @Overridethrow new ExecutionPlanCreationException("First parameter should publicbe void restoreState(Object[] state) {of type string"); } map = (ConcurrentHashMap<String, StreamEvent>) state[0]; ArrayList<Attribute> attributes }= new ArrayList<Attribute>(6); /** * To find events from the processor event pool, that the matches the matchingEvent based on finder logic. *attributes.add(new Attribute("formattedAddress", Attribute.Type.STRING)); attributes.add(new Attribute("latitude", Attribute.Type.DOUBLE)); attributes.add(new Attribute("longitude", Attribute.Type.DOUBLE)); * @param matchingEvent thereturn eventattributes; to be matched with the} events at the processor/** * @paramThis finderwill be called only once and this can thebe executionused elementto responsibleacquire for finding the corresponding events that* matchesrequired resources for the processing element. * * This will be called after initializing the system and before * thestarting matchingEventto basedprocess on poolthe of events at Processorevents. */ @return the matched events @Override */public void start() { //Implement @Overridestart logic to acquire relevant publicresources synchronized StreamEvent find(ComplexEvent matchingEvent, Finder} finder) { /** return finder.find(matchingEvent, map.values(),streamEventCloner); } /** * This will be called only once and this can be used to release * Tothe constructacquired aresources finderfor havingprocessing. the capability of finding events at the processor that corresponds to* This will be called before shutting down the incomingsystem. */ matchingEvent and the given matching@Override expression logic. public void stop() *{ //Implement stop logic to release *the @paramacquired expressionresources } /** * theUsed matchingto expressioncollect the serializable state of the *processing @paramelement, metaComplexEventthat need to be * persisted for the reconstructing metathe structureelement ofto the incomingsame matchingEventstate on a different point of *time @param executionPlanContext * current execution plan* context@return stateful objects of the processing *element as @paraman variableExpressionExecutorsarray the list of variable ExpressionExecutors already*/ created @Override * @param eventTableMap public Object[] currentState() { return new mapObject[0]; of event tables } * @param matchingStreamIndex/** * Used to therestore streamserialized indexstate of the incoming matchingEventprocessing element, for reconstructing * @paramthe withinTimeelement to the same state as if was on a previous point of time. the maximum* time gap between the events to* be@param matchedstate the stateful objects of the *element @returnas finderan havingarray theon capability of finding events at the processor against the expression and incoming * matchingEvent * the same order provided by currentState(). */ @Override public Findervoid constructFinderrestoreState(Expression expression, MetaComplexEvent metaComplexEvent, ExecutionPlanContext executionPlanContext, List<VariableExpressionExecutor> variableExpressionExecutors, Map<String, EventTable> eventTableMap, int matchingStreamIndex, long withinTime) { return CollectionOperatorParser.parse(expression, metaComplexEvent, executionPlanContext, variableExpressionExecutors, eventTableMap, matchingStreamIndex, inputDefinition, withinTime); } private String generateKey(StreamEvent event) { StringBuilder stringBuilder = new StringBuilder(); for (VariableExpressionExecutor executor : variableExpressionExecutors) { stringBuilder.append(event.getAttribute(executor.getPosition())); } return stringBuilder.toString(); } } Object[] state) { //Implement restore state logic. } } |
Sample geo.siddhiext extension mapping file for the custom stream function extension can be found below;
Code Block | ||
---|---|---|
| ||
#
# Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
#
# WSO2 Inc. licenses this file to you under the Apache License,
# Version 2.0 (the "License"); you may not use this file except
# in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
geocode=org.wso2.siddhi.extension.geo.GeocodeStreamFunctionProcessor |