Writing a Custom OutputAttributeAggregator
To write a custom output attribute aggregator, create a class implementing org.wso2.siddhi.core.query.selector.attribute.factory.OutputAttributeAggregatorFactory and add SiddhiExtension annotation to this class. Also create an appropriate OutputAttributeAggregator extending org.wso2.siddhi.core.query.selector.attribute.handler.OutputAttributeAggregator, which is created by the OutputAttributeAggregatorFactory based on the Attribute Type (Int, String, Long, Bool, Double, Float, etc). Then compile these classes and add the JAR files to the class path at <CEP_HOME>/repository/components/lib/
directory. Then add the fully-qualified class name of the class which implements the OutputAttributeAggregatorFactory class in a new line, to the <CEP_HOME>/repository/conf/siddhi/siddhi.extension
 file.
For example, if you have created the extension with namespace currency and function name fromUSDtoEUR, you can use that in the query as follows:Â
from StockExchangeStream[price >= 20] select symbol, price as priceInUSD, currency:fromUSDtoEUR(price, 0.75) as priceInEUR insert into StockQuoteStream;
Given below is a sample code that implements the above function.
package org.wso2.siddhi.extension; import org.wso2.siddhi.core.query.selector.attribute.factory.OutputAttributeAggregatorFactory; import org.wso2.siddhi.core.query.selector.attribute.handler.OutputAttributeAggregator; import org.wso2.siddhi.query.api.definition.Attribute.Type; import org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension; @SiddhiExtension(namespace = "currency", function = "fromUSDtoEUR") public class CurrencyConversionAggregatorFactory implements OutputAttributeAggregatorFactory { @Override public OutputAttributeAggregator createAttributeAggregator(Type[] types) { return new CurrencyConversionAggregatorDouble(); } }
This is the OutputAttributeAggregator class which implements the actual logic.
package org.wso2.siddhi.extension; import org.wso2.siddhi.core.query.selector.attribute.handler.OutputAttributeAggregator; import org.wso2.siddhi.query.api.definition.Attribute.Type; import java.text.DecimalFormat; public class CurrencyConversionAggregatorDouble implements OutputAttributeAggregator { private static final long serialVersionUID = 1358667430272544590L; @Override public Type getReturnType() { return Type.DOUBLE; } @Override public Object processAdd(Object obj) { double amountEUR = 0D; if (obj instanceof Object[]) { Object[] objArray = (Object[]) obj; double amountUSD = (Double)objArray[0]; double conversionRate = (Double)objArray[1]; amountEUR = Double.valueOf(new DecimalFormat("#.##").format(amountUSD * conversionRate)); } return amountEUR; } @Override public Object processRemove(Object obj) { double amountEUR = 0D; if (obj instanceof Object[]) { Object[] objArray = (Object[]) obj; double amountUSD = (Double)objArray[0]; double conversionRate = (Double)objArray[1]; amountEUR = Double.valueOf(new DecimalFormat("#.##").format(amountUSD * conversionRate)); } return amountEUR; } @Override public OutputAttributeAggregator newInstance() { return new CurrencyConversionAggregatorDouble(); } Â @Override public void destroy() { } }
This is the entry which you need to add in a new line in the <CEP_HOME>/repository/conf/siddhi/siddhi.extension
 file, to add the CurrencyConversionAggregatorFactory class (i.e. the implementation of the OutputAttributeAggregatorFactory class of the above example) to it.
org.wso2.siddhi.extension.CurrencyConversionAggregatorFactory