Writing a Custom Stream Processor Extension
Siddhi Stream Processor Extension allows events to be collected and expired with altering the event format based on the given input parameters.
To implement a custom stream processor, create a class extending "org.wso2.siddhi.core.query.processor.stream.StreamProcessor" and create an appropriate .siddhiext extension mapping file, compile the class, and build the jar containing the .class and .siddhiext files. Add them to the Siddhi class path. In the case of running them on WSO2 CEP add the jar to <CEP_HOME>/repository/components/lib.
For example, Stream Processor Extension created with namespace "timeseries" and function name "regress" can be referred in the query as follows:
from baseballData#timeseries:regress(2, 10000, 0.95, salary, rbi, walks, strikeouts, errors) select * insert into regResults;
Â
Sample implementation of a custom stream processor extension can be found below;
/* * Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. * * WSO2 Inc. licenses this file to you under the Apache License, * Version 2.0 (the "License"); you may not use this file except * in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package org.wso2.siddhi.extension.timeseries; import org.wso2.siddhi.core.config.ExecutionPlanContext; import org.wso2.siddhi.core.event.ComplexEvent; import org.wso2.siddhi.core.event.ComplexEventChunk; import org.wso2.siddhi.core.event.stream.StreamEvent; import org.wso2.siddhi.core.event.stream.StreamEventCloner; import org.wso2.siddhi.core.event.stream.populater.ComplexEventPopulater; import org.wso2.siddhi.core.exception.ExecutionPlanCreationException; import org.wso2.siddhi.core.executor.ConstantExpressionExecutor; import org.wso2.siddhi.core.executor.ExpressionExecutor; import org.wso2.siddhi.core.query.processor.Processor; import org.wso2.siddhi.core.query.processor.stream.StreamProcessor; import org.wso2.siddhi.extension.timeseries.linreg.MultipleLinearRegressionCalculator; import org.wso2.siddhi.extension.timeseries.linreg.RegressionCalculator; import org.wso2.siddhi.extension.timeseries.linreg.SimpleLinearRegressionCalculator; import org.wso2.siddhi.query.api.definition.AbstractDefinition; import org.wso2.siddhi.query.api.definition.Attribute; import java.util.ArrayList; import java.util.List; /** * The methods supported by this function are * timeseries:regress(int/long/float/double y, int/long/float/double x1, int/long/float/double x2 ...) * and * timeseries:regress(int calcInterval, int batchSize, double confidenceInterval, int/long/float/double y, int/long/float/double x1, int/long/float/double x2 ...) */ public class LinearRegressionStreamProcessor extends StreamProcessor { private int paramCount = 0; // Number of x variables +1 private int calcInterval = 1; // The frequency of regression calculation private int batchSize = 1000000000; // Maximum # of events, used for regression calculation private double ci = 0.95; // Confidence Interval private final int SIMPLE_LINREG_INPUT_PARAM_COUNT = 2; // Number of Input parameters in a simple linear regression private RegressionCalculator regressionCalculator = null; private int paramPosition = 0; /** * The init method of the LinearRegressionStreamProcessor, this method will be called before other methods * * @param inputDefinition the incoming stream definition * @param attributeExpressionExecutors the executors of each function parameters * @param executionPlanContext the context of the execution plan * @return the additional output attributes introduced by the function */ @Override protected List<Attribute> init(AbstractDefinition inputDefinition, ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) { paramCount = attributeExpressionLength; // Capture constant inputs if (attributeExpressionExecutors[0] instanceof ConstantExpressionExecutor){ paramCount = paramCount - 3; paramPosition = 3; try { calcInterval = ((Integer)attributeExpressionExecutors[0].execute(null)); batchSize = ((Integer)attributeExpressionExecutors[1].execute(null)); } catch(ClassCastException c) { throw new ExecutionPlanCreationException("Calculation interval, batch size and range should be of type int"); } try { ci = ((Double)attributeExpressionExecutors[2].execute(null)); } catch(ClassCastException c) { throw new ExecutionPlanCreationException("Confidence interval should be of type double and a value between 0 and 1"); } } // Pick the appropriate regression calculator if (paramCount > SIMPLE_LINREG_INPUT_PARAM_COUNT) { regressionCalculator = new MultipleLinearRegressionCalculator(paramCount, calcInterval, batchSize, ci); } else { regressionCalculator = new SimpleLinearRegressionCalculator(paramCount, calcInterval, batchSize, ci); } // Add attributes for standard error and all beta values String betaVal; ArrayList<Attribute> attributes = new ArrayList<Attribute>(paramCount); attributes.add(new Attribute("stderr", Attribute.Type.DOUBLE)); for (int itr = 0; itr < paramCount; itr++) { betaVal = "beta" + itr; attributes.add(new Attribute(betaVal, Attribute.Type.DOUBLE)); } return attributes; } /** * The main processing method that will be called upon event arrival * * @param streamEventChunk the event chunk that need to be processed * @param nextProcessor the next processor to which the success events need to be passed * @param streamEventCloner helps to clone the incoming event for local storage or modification * @param complexEventPopulater helps to populate the events with the resultant attributes */ @Override protected void process(ComplexEventChunk<StreamEvent> streamEventChunk, Processor nextProcessor, StreamEventCloner streamEventCloner, ComplexEventPopulater complexEventPopulater) { while (streamEventChunk.hasNext()) { ComplexEvent complexEvent = streamEventChunk.next(); Object[] inputData = new Object[attributeExpressionLength-paramPosition]; for (int i = paramPosition; i < attributeExpressionLength; i++) { inputData[i-paramPosition] = attributeExpressionExecutors[i].execute(complexEvent); } Object[] outputData = regressionCalculator.calculateLinearRegression(inputData); // Skip processing if user has specified calculation interval if (outputData == null) { streamEventChunk.remove(); } else { complexEventPopulater.populateComplexEvent(complexEvent, outputData); } } nextProcessor.process(streamEventChunk); } /** * This will be called only once and this can be used to acquire * required resources for the processing element. * This will be called after initializing the system and before * starting to process the events. */ @Override public void start() { //Implement start logic to acquire relevant resources } /** * This will be called only once and this can be used to release * the acquired resources for processing. * This will be called before shutting down the system. */ @Override public void stop() { //Implement stop logic to release the acquired resources } /** * Used to collect the serializable state of the processing element, that need to be * persisted for the reconstructing the element to the same state on a different point of time * * @return stateful objects of the processing element as an array */ @Override public Object[] currentState() { return new Object[0]; } /** * Used to restore serialized state of the processing element, for reconstructing * the element to the same state as if was on a previous point of time. * * @param state the stateful objects of the element as an array on * the same order provided by currentState(). */ @Override public void restoreState(Object[] state) { //Implement restore state logic } }
Â
Sample timeseries.siddhiext extension mapping file for the  custom stream processor extension can be found below;# # Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. # # WSO2 Inc. licenses this file to you under the Apache License, # Version 2.0 (the "License"); you may not use this file except # in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # regress=org.wso2.siddhi.extension.timeseries.LinearRegressionStreamProcessor