com.atlassian.confluence.content.render.xhtml.migration.exceptions.UnknownMacroMigrationException: The macro 'next_previous_links' is unknown.
Writing a Custom Stream Processor Extension
The Siddhi Stream Processor Extension allows events to be collected and expired by modifying the event format based on the given input parameters.
To implement a custom stream processor, follow the procedure below.
- Create a class extendingÂ
org.wso2.siddhi.core.query.processor.stream.StreamProcessor
. - Create an appropriateÂ
.siddhiext
extension mapping file. - Compile the class.
- Build the jar containing theÂ
.class
and the.siddhiext
files. - Add the jar to the Siddhi class path. If you need to run the extension on WSO2 CEP, add the jar to theÂ
<CEP_HOME>/repository/components/dropins
 directory.
For example, a Stream Processor extension created with timeseries
 as the namespace and regress
as the function name can be referred in a query as shown below.Â
from baseballData#timeseries:regress(2, 10000, 0.95, salary, rbi, walks, strikeouts, errors) select * insert into regResults;
Â
The following is a sample implementation of a custom stream processor extension.
/* * Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. * * WSO2 Inc. licenses this file to you under the Apache License, * Version 2.0 (the "License"); you may not use this file except * in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package org.wso2.siddhi.extension.timeseries; import org.wso2.siddhi.core.config.ExecutionPlanContext; import org.wso2.siddhi.core.event.ComplexEvent; import org.wso2.siddhi.core.event.ComplexEventChunk; import org.wso2.siddhi.core.event.stream.StreamEvent; import org.wso2.siddhi.core.event.stream.StreamEventCloner; import org.wso2.siddhi.core.event.stream.populater.ComplexEventPopulater; import org.wso2.siddhi.core.exception.ExecutionPlanCreationException; import org.wso2.siddhi.core.executor.ConstantExpressionExecutor; import org.wso2.siddhi.core.executor.ExpressionExecutor; import org.wso2.siddhi.core.query.processor.Processor; import org.wso2.siddhi.core.query.processor.stream.StreamProcessor; import org.wso2.siddhi.extension.timeseries.linreg.MultipleLinearRegressionCalculator; import org.wso2.siddhi.extension.timeseries.linreg.RegressionCalculator; import org.wso2.siddhi.extension.timeseries.linreg.SimpleLinearRegressionCalculator; import org.wso2.siddhi.query.api.definition.AbstractDefinition; import org.wso2.siddhi.query.api.definition.Attribute; import java.util.ArrayList; import java.util.List; /** * The methods supported by this function are * timeseries:regress(int/long/float/double y, int/long/float/double x1, int/long/float/double x2 ...) * and * timeseries:regress(int calcInterval, int batchSize, double confidenceInterval, int/long/float/double y, int/long/float/double x1, int/long/float/double x2 ...) */ public class LinearRegressionStreamProcessor extends StreamProcessor { private int paramCount = 0; // Number of x variables +1 private int calcInterval = 1; // The frequency of regression calculation private int batchSize = 1000000000; // Maximum # of events, used for regression calculation private double ci = 0.95; // Confidence Interval private final int SIMPLE_LINREG_INPUT_PARAM_COUNT = 2; // Number of Input parameters in a simple linear regression private RegressionCalculator regressionCalculator = null; private int paramPosition = 0; /** * The init method of the LinearRegressionStreamProcessor, this method will be called before other methods * * @param inputDefinition the incoming stream definition * @param attributeExpressionExecutors the executors of each function parameters * @param executionPlanContext the context of the execution plan * @return the additional output attributes introduced by the function */ @Override protected List<Attribute> init(AbstractDefinition inputDefinition, ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) { paramCount = attributeExpressionLength; // Capture constant inputs if (attributeExpressionExecutors[0] instanceof ConstantExpressionExecutor){ paramCount = paramCount - 3; paramPosition = 3; try { calcInterval = ((Integer)attributeExpressionExecutors[0].execute(null)); batchSize = ((Integer)attributeExpressionExecutors[1].execute(null)); } catch(ClassCastException c) { throw new ExecutionPlanCreationException("Calculation interval, batch size and range should be of type int"); } try { ci = ((Double)attributeExpressionExecutors[2].execute(null)); } catch(ClassCastException c) { throw new ExecutionPlanCreationException("Confidence interval should be of type double and a value between 0 and 1"); } } // Pick the appropriate regression calculator if (paramCount > SIMPLE_LINREG_INPUT_PARAM_COUNT) { regressionCalculator = new MultipleLinearRegressionCalculator(paramCount, calcInterval, batchSize, ci); } else { regressionCalculator = new SimpleLinearRegressionCalculator(paramCount, calcInterval, batchSize, ci); } // Add attributes for standard error and all beta values String betaVal; ArrayList<Attribute> attributes = new ArrayList<Attribute>(paramCount); attributes.add(new Attribute("stderr", Attribute.Type.DOUBLE)); for (int itr = 0; itr < paramCount; itr++) { betaVal = "beta" + itr; attributes.add(new Attribute(betaVal, Attribute.Type.DOUBLE)); } return attributes; } /** * The main processing method that will be called upon event arrival * * @param streamEventChunk the event chunk that need to be processed * @param nextProcessor the next processor to which the success events need to be passed * @param streamEventCloner helps to clone the incoming event for local storage or modification * @param complexEventPopulater helps to populate the events with the resultant attributes */ @Override protected void process(ComplexEventChunk<StreamEvent> streamEventChunk, Processor nextProcessor, StreamEventCloner streamEventCloner, ComplexEventPopulater complexEventPopulater) { while (streamEventChunk.hasNext()) { ComplexEvent complexEvent = streamEventChunk.next(); Object[] inputData = new Object[attributeExpressionLength-paramPosition]; for (int i = paramPosition; i < attributeExpressionLength; i++) { inputData[i-paramPosition] = attributeExpressionExecutors[i].execute(complexEvent); } Object[] outputData = regressionCalculator.calculateLinearRegression(inputData); // Skip processing if user has specified calculation interval if (outputData == null) { streamEventChunk.remove(); } else { complexEventPopulater.populateComplexEvent(complexEvent, outputData); } } nextProcessor.process(streamEventChunk); } /** * This will be called only once and this can be used to acquire * required resources for the processing element. * This will be called after initializing the system and before * starting to process the events. */ @Override public void start() { //Implement start logic to acquire relevant resources } /** * This will be called only once and this can be used to release * the acquired resources for processing. * This will be called before shutting down the system. */ @Override public void stop() { //Implement stop logic to release the acquired resources } /** * Used to collect the serializable state of the processing element, that need to be * persisted for the reconstructing the element to the same state on a different point of time * * @return stateful objects of the processing element as an array */ @Override public Object[] currentState() { return new Object[0]; } /** * Used to restore serialized state of the processing element, for reconstructing * the element to the same state as if was on a previous point of time. * * @param state the stateful objects of the element as an array on * the same order provided by currentState(). */ @Override public void restoreState(Object[] state) { //Implement restore state logic } }
Â
Sample timeseries.siddhiext extension mapping file for the  custom stream processor extension can be found below;# # Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. # # WSO2 Inc. licenses this file to you under the Apache License, # Version 2.0 (the "License"); you may not use this file except # in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # regress=org.wso2.siddhi.extension.timeseries.LinearRegressionStreamProcessor
com.atlassian.confluence.content.render.xhtml.migration.exceptions.UnknownMacroMigrationException: The macro 'next_previous_links2' is unknown.