Writing a Custom Transformer
To write a custom transformer, create a class extending "org.wso2.siddhi.core.query.processor.transform.TransformProcessor" add the SiddhiExtionsion Annotation, compile that class, and add the jar to the class path <CEP_HOME>/repository/components/lib. Then add the fully-qualified class name for the implementation class in a new line, to the siddhi.extension file located at <CEP_HOME>/repository/conf/siddhi.
For example, if you have created the extension with namespace debs and function name getVelocities, you can use that in a query as follows:Â
from debsStream#transform.debs:getVelocities(v,vx,vy,vz) select velocityX, velocityY, velocityZ insert into velocityStream;
Given below is a sample code that implements the above function.
package org.wso2.siddhi.extension; import org.apache.log4j.Logger; import org.wso2.siddhi.core.config.SiddhiContext; import org.wso2.siddhi.core.event.Event; import org.wso2.siddhi.core.event.in.InEvent; import org.wso2.siddhi.core.event.in.InListEvent; import org.wso2.siddhi.core.event.in.InStream; import org.wso2.siddhi.core.executor.expression.ExpressionExecutor; import org.wso2.siddhi.core.query.processor.transform.TransformProcessor; import org.wso2.siddhi.query.api.definition.Attribute; import org.wso2.siddhi.query.api.definition.StreamDefinition; import org.wso2.siddhi.query.api.expression.Expression; import org.wso2.siddhi.query.api.expression.Variable; import org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension; import java.util.HashMap; import java.util.List; import java.util.Map; @SiddhiExtension(namespace = "debs", function = "getVelocities") public class VelocityTransformProcessor extends TransformProcessor { private Map<String, Integer> paramPositions = new HashMap<String, Integer>(); public VelocityTransformProcessor() { this.outStreamDefinition = new StreamDefinition().name("velocityStream") .attribute("velocityX", Attribute.Type.DOUBLE) .attribute("velocityY", Attribute.Type.DOUBLE) .attribute("velocityZ", Attribute.Type.DOUBLE); } @Override protected InStream processEvent(InEvent inEvent) { double vMagnitude = (Double) inEvent.getData(paramPositions.get("v")); double vxComponent = (Double) inEvent.getData(paramPositions.get("vx")); double vyComponent = (Double) inEvent.getData(paramPositions.get("vy")); double vzComponent = (Double) inEvent.getData(paramPositions.get("vz")); Object[] data = new Object[]{vMagnitude * vxComponent / 10000000000L, vMagnitude * vyComponent / 10000000000L, vMagnitude * vzComponent / 10000000000L}; return new InEvent(inEvent.getStreamId(), System.currentTimeMillis(), data); } @Override protected InStream processEvent(InListEvent inListEvent) { InListEvent transformedListEvent = new InListEvent(); for (Event event : inListEvent.getEvents()) { if (event instanceof InEvent) { transformedListEvent.addEvent((Event) processEvent((InEvent) event)); } } return transformedListEvent; } @Override protected Object[] currentState() { return new Object[]{paramPositions}; } @Override protected void restoreState(Object[] objects) { if (objects.length > 0 && objects[0] instanceof Map) { paramPositions = (Map<String, Integer>) objects[0]; } } @Override protected void init(Expression[] parameters, List<ExpressionExecutor> expressionExecutors, StreamDefinition inStreamDefinition, StreamDefinition outStreamDefinition, String elementId, SiddhiContext siddhiContext) { for (Expression parameter : parameters) { if (parameter instanceof Variable) { Variable var = (Variable) parameter; String attributeName = var.getAttributeName(); paramPositions.put(attributeName, inStreamDefinition.getAttributePosition(attributeName)); } } } @Override public void destroy() { } }
To make CEP aware of the above extension, the following line should go into the <CEP_HOME>/repository/conf/siddhi/siddhi.extension file.
org.wso2.siddhi.extension.VelocityTransformProcessor |
---|
Note
If you are using CEP 3.0.0, please download and apply the following patch for siddhi which contains a fix for CEP-628