This site contains the documentation that is relevant to older WSO2 product versions and offerings.
For the latest WSO2 documentation, go to https://wso2.com/documentation/.

Priority Extension

PriorityStreamProcessor keeps track of the priority of events in a stream. This stream processor requires three arguments which are as follows.

  • A unique key variable to identify the event.
  • A priority variable that contains the priority increment.
  • A timeout in constant to decrease the priority by one after the given timeout.

 

Syntax#priority:time(variable, priority, timeout)
Extension TypeStreamProcessor
Parameters
  • variable: A unique key variable to identify the event.
  • priority: The priority variable that contains the priority increment.
  • return: Timeout in constant to decrease the priority by one after the given timeout.
Return

When an event with a new unique key arrives, the PriorityStreamProcessor checks the priority and if the priority is 0, the event is sent without being stored internally. If an event has a priority greater than 0, it is stored in the Stream Processor, and the current priority is injected into that event.

When an event with the existing priority key arrives, it is stored as a recent event, the priority is increased by the priority of the received event, and the priorityKey and the currentPriority are injected into the event.

After every given timeout, the priority of each event is reduced by 1, and the updated priority is sent out with the last known attributes of those events. This continues until their priority is reduced to 0.

When an event with an existing ID and a large negative priority arrives, the output is 0 and not a negative priority.

Example

define stream cseEventStream (symbol string, priority long, volume int);

@info(name = 'query1')

from cseEventStream#priority:time(symbol, priority, 1 sec)

select *

insert all events into outputStream ;