com.atlassian.confluence.content.render.xhtml.migration.exceptions.UnknownMacroMigrationException: The macro 'next_previous_links' is unknown.
Writing a Custom Aggregate Function
The Siddhi Aggregate Function consumes zero or more parameters for each event and outputs a single attribute with an aggregated result based on the input parameters as an output. This could be used in conjunction with a window in order to find the aggregated results based on a given window such as the Aggregate Function operator.
Ti implement a custom aggregate function, follow the procedure below.
- Create a class extending theÂ
org.wso2.siddhi.core.query.selector.attribute.aggregator.AttributeAggregator
. - Create an appropriate
.siddhiext
extension mapping file. - Compile the class.
- Build the jar containing the .class and .siddhiext files.
- Add the jar to the Siddhi class path. If you need to run the extension on WSO2 CEP, add it to theÂ
<CEP_HOME>/repository/components/dropins
directory.
For example, an aggregate function extension with custom
as the namespace and count
as the function name can be referred in a query as follows.Â
from pizzaOrder#window.length(20) select custom:count(orderNo) as totalOrders insert into orderCount;
Sample implementation of a custom aggregate function extension can be found below;
/* * Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. * * WSO2 Inc. licenses this file to you under the Apache License, * Version 2.0 (the "License"); you may not use this file except * in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package org.wso2.siddhi.extension.custom.aggregate; import org.wso2.siddhi.core.config.ExecutionPlanContext; import org.wso2.siddhi.core.executor.ExpressionExecutor; import org.wso2.siddhi.core.query.selector.attribute.aggregator.AttributeAggregator; import org.wso2.siddhi.query.api.definition.Attribute; /** * Custom Count Extension which returns event count as a long */ public class CountAggregateFunction extends AttributeAggregator { private static Attribute.Type type = Attribute.Type.LONG; private long value = 0l; /** * The initialization method for CountAggregateFunction * * @param attributeExpressionExecutors are the executors of each attributes in the function * @param executionPlanContext Execution plan runtime context */ @Override protected void init(ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) { //Implement class specific initialization } /** * The process add method of the CountAggregateFunction, used when zero or one function parameter is provided * * @param data null if the function parameter count is zero or runtime data value of the function parameter * @return the count value */ @Override public Object processAdd(Object data) { value++; return value; } /** * The process add method of the CountAggregateFunction, used when more than one function parameters are provided * * @param data the data values for the function parameters * @return the count value */ @Override public Object processAdd(Object[] data) { value++; return value; } /** * The process remove method of the CountAggregateFunction, used when zero or one function parameter is provided * * @param data null if the function parameter count is zero or runtime data value of the function parameter * @return the count value */ @Override public Object processRemove(Object data) { value--; return value; } /** * The process remove method of the CountAggregateFunction, used when more than one function parameters are provided * * @param data the data values for the function parameters * @return the count value */ @Override public Object processRemove(Object[] data) { value--; return value; } /** * Reset count value * * @return reset value */ @Override public Object reset() { value = 0l; return value; } /** * This will be called only once and this can be used to acquire * required resources for the processing element. * This will be called after initializing the system and before * starting to process the events. */ @Override public void start() { //Implement start logic to acquire relevant resources } /** * This will be called only once and this can be used to release * the acquired resources for processing. * This will be called before shutting down the system. */ @Override public void stop() { //Implement stop logic to release the acquired resources } /** * Used to collect the serializable state of the processing element, that need to be * persisted for the reconstructing the element to the same state on a different point of time * * @return stateful objects of the processing element as an array */ @Override public Object[] currentState() { return new Object[]{value}; } /** * Used to restore serialized state of the processing element, for reconstructing * the element to the same state as if was on a previous point of time. * * @param state the stateful objects of the element as an array on * the same order provided by currentState(). */ @Override public void restoreState(Object[] state) { value = (Long) state[0]; } public Attribute.Type getReturnType() { return type; } }
Sample custom.siddhiext extension mapping file for the  custom aggregate function extension can be found below;
# # Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. # # WSO2 Inc. licenses this file to you under the Apache License, # Version 2.0 (the "License"); you may not use this file except # in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # count=org.wso2.siddhi.extension.custom.aggregate.CountAggregateFunction
com.atlassian.confluence.content.render.xhtml.migration.exceptions.UnknownMacroMigrationException: The macro 'next_previous_links2' is unknown.