To write a custom output attribute aggregator, create a class implementing "org.wso2.siddhi.core.query.selector.attribute.factory.OutputAttributeAggregatorFactory" and add SiddhiExtension annotation to this class. Also create appropriate an OutputAttributeAggregator extending "org.wso2.siddhi.core.query.selector.attribute.handler.OutputAttributeAggregator", which is created by the OutputAttributeAggregatorFactory based on the Attribute Type (Int, String, Long, Bool, Double, Float, etc). Then compile these classes and add the jar files to the class path at <CEP_HOME>/repository/components/lib. Then add the fully-qualified class name for the OutputAttributeAggregatorFactory implementation class in a new line, to the siddhi.extension file located at <CEP_HOME>/repository/conf/siddhi.
For example, if you have created the extension with namespace currency and function name fromUSDtoEUR, you can use that in the query as follows:
from StockExchangeStream[price >= 20] insert into StockQuoteStream symbol, price as priceInUSD, currency:fromUSDtoEUR(price, 0.75) as priceInEUDR
Given below is a sample code that implements the above function.
package org.wso2.siddhi.extension; import org.wso2.siddhi.core.query.selector.attribute.factory.OutputAttributeAggregatorFactory; import org.wso2.siddhi.core.query.selector.attribute.handler.OutputAttributeAggregator; import org.wso2.siddhi.query.api.definition.Attribute.Type; import org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension; @SiddhiExtension(namespace = "currency", function = "fromUSDtoEUR") public class CurrencyConversionAggregatorFactory implements OutputAttributeAggregatorFactory { @Override public OutputAttributeAggregator createAttributeAggregator(Type[] types) { return new CurrencyConversionAggregatorDouble(); } }
This is the OutputAttributeAggregator class which implements the actual logic.
package org.wso2.siddhi.extension; import org.wso2.siddhi.core.query.selector.attribute.handler.OutputAttributeAggregator; import org.wso2.siddhi.query.api.definition.Attribute.Type; import java.text.DecimalFormat; public class CurrencyConversionAggregatorDouble implements OutputAttributeAggregator { private static final long serialVersionUID = 1358667430272544590L; @Override public Type getReturnType() { return Type.DOUBLE; } @Override public Object processAdd(Object obj) { double amountEUR = 0D; if (obj instanceof Object[]) { Object[] objArray = (Object[]) obj; double amountUSD = (Double)objArray[0]; double conversionRate = (Double)objArray[1]; amountEUR = Double.valueOf(new DecimalFormat("#.##").format(amountUSD * conversionRate)); } return amountEUR; } @Override public Object processRemove(Object obj) { double amountEUR = 0D; if (obj instanceof Object[]) { Object[] objArray = (Object[]) obj; double amountUSD = (Double)objArray[0]; double conversionRate = (Double)objArray[1]; amountEUR = Double.valueOf(new DecimalFormat("#.##").format(amountUSD * conversionRate)); } return amountEUR; } @Override public OutputAttributeAggregator newInstance() { return new CurrencyConversionAggregatorDouble(); } }