The Siddhi Stream Processor Extension allows events to be collected and expired by modifying the event format based on the given input parameters.
To implement a custom stream processor, follow the procedure below.
- Create a class extending
org.wso2.siddhi.core.query.processor.stream.StreamProcessor
. - Create an appropriate
.siddhiext
extension mapping file. - Compile the class.
- Build the jar containing the
.class
and the.siddhiext
files. - Add the jar to the Siddhi class path. If you need to run the extension on WSO2 CEP, add the jar to the
<CEP_HOME>/repository/components/dropins
directory.
For example, a Stream Processor extension created with timeseries
as the namespace and regress
as the function name can be referred in a query as shown below.
Code Block | ||||
---|---|---|---|---|
| ||||
from baseballData#timeseries:regress(2, 10000, 0.95, salary, rbi, walks, strikeouts, errors)
select *
insert into regResults; |
The following is a sample implementation of a custom stream processor extension.
Code Block | ||||
---|---|---|---|---|
| ||||
/*
* Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.wso2.siddhi.extension.timeseries;
import org.wso2.siddhi.core.config.ExecutionPlanContext;
import org.wso2.siddhi.core.event.ComplexEvent;
import org.wso2.siddhi.core.event.ComplexEventChunk;
import org.wso2.siddhi.core.event.stream.StreamEvent;
import org.wso2.siddhi.core.event.stream.StreamEventCloner;
import org.wso2.siddhi.core.event.stream.populater.ComplexEventPopulater;
import org.wso2.siddhi.core.exception.ExecutionPlanCreationException;
import org.wso2.siddhi.core.executor.ConstantExpressionExecutor;
import org.wso2.siddhi.core.executor.ExpressionExecutor;
import org.wso2.siddhi.core.query.processor.Processor;
import org.wso2.siddhi.core.query.processor.stream.StreamProcessor;
import org.wso2.siddhi.extension.timeseries.linreg.MultipleLinearRegressionCalculator;
import org.wso2.siddhi.extension.timeseries.linreg.RegressionCalculator;
import org.wso2.siddhi.extension.timeseries.linreg.SimpleLinearRegressionCalculator;
import org.wso2.siddhi.query.api.definition.AbstractDefinition;
import org.wso2.siddhi.query.api.definition.Attribute;
import java.util.ArrayList;
import java.util.List;
/**
* The methods supported by this function are
* timeseries:regress(int/long/float/double y, int/long/float/double x1, int/long/float/double x2 ...)
* and
* timeseries:regress(int calcInterval, int batchSize, double confidenceInterval, int/long/float/double y, int/long/float/double x1, int/long/float/double x2 ...)
*/
public class LinearRegressionStreamProcessor extends StreamProcessor {
private int paramCount = 0; // Number of x variables +1
private int calcInterval = 1; // The frequency of regression calculation
private int batchSize = 1000000000; // Maximum # of events, used for regression calculation
private double ci = 0.95; // Confidence Interval
private final int SIMPLE_LINREG_INPUT_PARAM_COUNT = 2; // Number of Input parameters in a simple linear regression
private RegressionCalculator regressionCalculator = null;
private int paramPosition = 0;
/**
* The init method of the LinearRegressionStreamProcessor, this method will be called before other methods
*
* @param inputDefinition the incoming stream definition
* @param attributeExpressionExecutors the executors of each function parameters
* @param executionPlanContext the context of the execution plan
* @return the additional output attributes introduced by the function
*/
@Override
protected List<Attribute> init(AbstractDefinition inputDefinition, ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) {
paramCount = attributeExpressionLength;
// Capture constant inputs
if (attributeExpressionExecutors[0] instanceof ConstantExpressionExecutor){
paramCount = paramCount - 3;
paramPosition = 3;
try {
calcInterval = ((Integer)attributeExpressionExecutors[0].execute(null));
batchSize = ((Integer)attributeExpressionExecutors[1].execute(null));
} catch(ClassCastException c) {
throw new ExecutionPlanCreationException("Calculation interval, batch size and range should be of type int");
}
try {
ci = ((Double)attributeExpressionExecutors[2].execute(null));
} catch(ClassCastException c) {
throw new ExecutionPlanCreationException("Confidence interval should be of type double and a value between 0 and 1");
}
}
// Pick the appropriate regression calculator
if (paramCount > SIMPLE_LINREG_INPUT_PARAM_COUNT) {
regressionCalculator = new MultipleLinearRegressionCalculator(paramCount, calcInterval, batchSize, ci);
} else {
regressionCalculator = new SimpleLinearRegressionCalculator(paramCount, calcInterval, batchSize, ci);
}
// Add attributes for standard error and all beta values
String betaVal;
ArrayList<Attribute> attributes = new ArrayList<Attribute>(paramCount);
attributes.add(new Attribute("stderr", Attribute.Type.DOUBLE));
for (int itr = 0; itr < paramCount; itr++) {
betaVal = "beta" + itr;
attributes.add(new Attribute(betaVal, Attribute.Type.DOUBLE));
}
return attributes;
}
/**
* The main processing method that will be called upon event arrival
*
* @param streamEventChunk the event chunk that need to be processed
* @param nextProcessor the next processor to which the success events need to be passed
* @param streamEventCloner helps to clone the incoming event for local storage or modification
* @param complexEventPopulater helps to populate the events with the resultant attributes
*/
@Override
protected void process(ComplexEventChunk<StreamEvent> streamEventChunk, Processor nextProcessor, StreamEventCloner streamEventCloner, ComplexEventPopulater complexEventPopulater) {
while (streamEventChunk.hasNext()) {
ComplexEvent complexEvent = streamEventChunk.next();
Object[] inputData = new Object[attributeExpressionLength-paramPosition];
for (int i = paramPosition; i < attributeExpressionLength; i++) {
inputData[i-paramPosition] = attributeExpressionExecutors[i].execute(complexEvent);
}
Object[] outputData = regressionCalculator.calculateLinearRegression(inputData);
// Skip processing if user has specified calculation interval
if (outputData == null) {
streamEventChunk.remove();
} else {
complexEventPopulater.populateComplexEvent(complexEvent, outputData);
}
}
nextProcessor.process(streamEventChunk);
}
/**
* This will be called only once and this can be used to acquire
* required resources for the processing element.
* This will be called after initializing the system and before
* starting to process the events.
*/
@Override
public void start() {
//Implement start logic to acquire relevant resources
}
/**
* This will be called only once and this can be used to release
* the acquired resources for processing.
* This will be called before shutting down the system.
*/
@Override
public void stop() {
//Implement stop logic to release the acquired resources
}
/**
* Used to collect the serializable state of the processing element, that need to be
* persisted for the reconstructing the element to the same state on a different point of time
*
* @return stateful objects of the processing element as an array
*/
@Override
public Object[] currentState() {
return new Object[0];
}
/**
* Used to restore serialized state of the processing element, for reconstructing
* the element to the same state as if was on a previous point of time.
*
* @param state the stateful objects of the element as an array on
* the same order provided by currentState().
*/
@Override
public void restoreState(Object[] state) {
//Implement restore state logic
}
} |
Sample timeseries.siddhiext extension mapping file for the custom stream processor extension can be found below;
Code Block | ||
---|---|---|
| ||
#
# Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
#
# WSO2 Inc. licenses this file to you under the Apache License,
# Version 2.0 (the "License"); you may not use this file except
# in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
regress=org.wso2.siddhi.extension.timeseries.LinearRegressionStreamProcessor |