This documentation is for WSO2 CEP 3.1.0. View the home page of the latest release.

Unknown macro: {next_previous_links}
Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 4 Current »

To write a custom output attribute aggregator, create a class implementing "org.wso2.siddhi.core.query.selector.attribute.factory.OutputAttributeAggregatorFactory" and add SiddhiExtension annotation to this class. Also create appropriate an OutputAttributeAggregator extending "org.wso2.siddhi.core.query.selector.attribute.handler.OutputAttributeAggregator", which is created by the OutputAttributeAggregatorFactory based on the Attribute Type (Int, String, Long, Bool, Double, Float, etc). Then compile these classes and add the jar files to the class path at <CEP_HOME>/repository/components/lib. Then add the fully-qualified class name for the OutputAttributeAggregatorFactory implementation class in a new line, to the siddhi.extension file located at <CEP_HOME>/repository/conf/siddhi.

For example, if you have created the extension with namespace currency and function name fromUSDtoEUR, you can use that in the query as follows: 

from StockExchangeStream[price >= 20] 
select symbol, price as priceInUSD, currency:fromUSDtoEUR(price, 0.75) as priceInEUR 
insert into StockQuoteStream;

Given below is a sample code that implements the above function.

OuputAttributeAggregatorFactory class
package org.wso2.siddhi.extension;
import org.wso2.siddhi.core.query.selector.attribute.factory.OutputAttributeAggregatorFactory;
import org.wso2.siddhi.core.query.selector.attribute.handler.OutputAttributeAggregator;
import org.wso2.siddhi.query.api.definition.Attribute.Type;
import org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension;

@SiddhiExtension(namespace = "currency", function = "fromUSDtoEUR")
public class CurrencyConversionAggregatorFactory implements OutputAttributeAggregatorFactory {
    @Override
    public OutputAttributeAggregator createAttributeAggregator(Type[] types) {
        return new CurrencyConversionAggregatorDouble();
    }
}

This is the OutputAttributeAggregator class which implements the actual logic.

OutputAttributeAggregator class
package org.wso2.siddhi.extension;

import org.wso2.siddhi.core.query.selector.attribute.handler.OutputAttributeAggregator;
import org.wso2.siddhi.query.api.definition.Attribute.Type;
import java.text.DecimalFormat;

public class CurrencyConversionAggregatorDouble implements OutputAttributeAggregator {
    private static final long serialVersionUID = 1358667430272544590L;


    @Override
    public Type getReturnType() {
        return Type.DOUBLE;
    }


    @Override
    public Object processAdd(Object obj) {
        double amountEUR = 0D;
        if (obj instanceof Object[]) {
            Object[] objArray = (Object[]) obj;
            double amountUSD = (Double)objArray[0];
            double conversionRate = (Double)objArray[1];
            amountEUR = Double.valueOf(new DecimalFormat("#.##").format(amountUSD * conversionRate));
        }
        return amountEUR;
    }


    @Override
    public Object processRemove(Object obj) {
        double amountEUR = 0D;
        if (obj instanceof Object[]) {
            Object[] objArray = (Object[]) obj;
            double amountUSD = (Double)objArray[0];
            double conversionRate = (Double)objArray[1];
            amountEUR = Double.valueOf(new DecimalFormat("#.##").format(amountUSD * conversionRate));
        }
        return amountEUR;
    }


    @Override
    public OutputAttributeAggregator newInstance() {
        return new CurrencyConversionAggregatorDouble();
    }
 
    @Override
    public void destroy() {
    }
}
 
 
  • No labels