com.atlassian.confluence.content.render.xhtml.migration.exceptions.UnknownMacroMigrationException: The macro 'next_previous_link3' is unknown.

Understanding Event Streams and Event Tables

Events are the lifeline of WSO2 CEP/DAS. They not only process data as events, but also interact with external systems using events. Event is a unit of data, and an event stream is a sequence of events of a particular type. The type of events can be defined as an event stream definition. The following sections explain how to work with events in WSO2 DAS.

Event streams

You can manage event streams through event stream definitions. 

Event stream definition


Definitions of the event streams are stored in the filesystem as deployable artifacts in the <PRODUCT _HOME>/repository/deployment/server/eventstreams/ directory as .json files. These are hot deployable files and can be added/removed when the server is up and running. A sample event stream definition is as follows.


{
  "streamId": "org.wso2.test:1.0.0",
  "name": "org.wso2.test",
  "version": "1.0.0",
  "nickName": "TestStream",
  "description": "Test Stream",
  "metaData": [
    {
      "name": "ip",
      "type": "STRING"
    }
  ],
  "correlationData": [
    {
      "name": "id",
      "type": "LONG"
    }
  ],
  "payloadData": [
    {
      "name": "testMessage",
      "type": "STRING"
    }
  ]
}

The properties of the above event stream definition are described below.

PropertyDescription
Event Stream NameName of the event stream.
Event Stream VersionVersion of the event stream. (Default value is 1.0.0.)
Event Stream DescriptionDescription of the events stream. (This is optional.)
Event Stream Nick-NameNick-names of an event streams separated by commas.(This is optional.)
Stream Attributes

Stream attributes contains the data of the event. Data is divided into the following 3 logical categories for maintenance and usability. It is not required to have attributes for all 3 categories, but there should be at least one category with at least one attribute defined. The attribute names should be unique within each category.

  • Meta Data: This refers to meta information of the events (e.g., timestamp, host, IP, etc.). They are received with the meta tag (i.e., in the meta_<attribute name> format).
  • Correlation Data: This refers to information that is used to correlate multiple events (e.g., correlation_id,temporal_id etc.). They are received with the correlation tag (i.e., in the correlation_<attribute name> format).
  • Payload Data: Contains the actual data that the event intends to have. (Referred to as <attribute name>.)

e.g., The following attributes exist in a single event.

  • event_timestamp
  • request_IP_address
  • correlation_Id
  • price
  • symbol

These attributes can be logically categorized as follows.

CategoryAttributesNotes
Meta Data
  • event_timestamp
  • request_IP_address
These attributes provide information about the events themselves.
Correlation Data
  • correlation_Id
This attribute correlates the event with other events from other streams, and it is useful when performing join operations on a stream.
Payload Data
  • price
  • symbol
These are information provided via the event.
  • Note that the internal system does not differentiate the attributes based on this categorization. However, it is recommended to categorize the attributes for the purpose of organizing them in a logical manner within the stream definition.
  • If you edit an attribute in an event stream or add a new attribute to it, other artifacts that are associated with the stream will be inactive. Therefore, it is recommended to redefine the stream with a new version including the additional/changed attribute.

Adding an event stream 

You can create an event stream by creating a new event stream definition using the design view or the source view.

Using the design view

Follow the steps below to add an event stream using the design view.

  1. Log in to the management console, and click Main.
  2. Click Event Streams in the Event Processor menu, and then click Add Event Stream.
  3. Enter details of the stream definition that you want to create as shown in the below example.
    add new event stream
  4. Click Add Event Stream, to create the Event Stream in the system. When you click OK in the pop-up message on successful addition of the stream definition, you view it in the Available Event Streams list as shown below.
    available event streams list
Using the source view

Follow the steps below to add an event stream using the source view.

  1. Log in to the management console, and click Main.
  2. Click Event Streams in the Event Processor menu, and then click Add Event Stream.
  3. Click switch to source view.

    Click switch to design view to add the event stream using the design view.

  4. Enter details of the stream definition that you want to create as shown in the below example.
    design view of adding an event stream
  5. Click Add Event Stream, to create the event stream in the system. You can view the new event stream in the Available Event Streams list as shown below.

Deleting an event stream 

Follow the steps below to delete an event stream by deleting the corresponding event stream definition.

  1. Log in to the management console, and click  Main .
  2. Click Event Streams in the Event Processor menu. You view the Available Event Streams list.
  3. Click the  Delete  button of the corresponding event stream to delete it.

Editing an event stream 

Follow the steps below to edit an event stream by editing the corresponding event stream definition.

  1. Log in to the management console, and click Main.
  2. Click Event Streams in the Event Processor menu. You view the Available Event Streams list.
  3. Click the Edit button of the corresponding event stream to edit it.

    Click the  switch to source view  link to edit an event stream using the source view.

Creating sample events

Follow the steps below to create sample events for a defined event stream.

  1. Log in to the management console, and click Main.
  2. Click Event Streams in the Event Processor menu. You view the Available Event Streams list.
  3. Click the Event Stream Id of the corresponding event stream for which you want to create the sample event. 
  4. Select the event format type (i.e. xml, json, or text) from the drop down list, which you want to create the sample event in. 
  5. You view details of the event stream as shown below. 
  6. Click Generate Event to create the sample event.

Event formats

WSO2 CEP/DAS facilitates the following default and custom event formats.

Default event formats

By default, WSO2 CEP/DAS represents an event as a WSO2Event object. Furthermore, WSO2 CEP/DAS supports events in XML, JSON, Text and Map formats. The default event formats of the XML, JSON, Text and Map representations for the following sample event stream definition are as follows.

Sample event stream definition
{
  "streamId": "org.wso2.test:1.0.0",
  "name": "org.wso2.test",
  "version": "1.0.0",
  "nickName": "TestStream",
  "description": "Test Stream",
  "metaData": [
    {
      "name": "ip",
      "type": "STRING"
    }
  ],
  "correlationData": [
    {
      "name": "id",
      "type": "LONG"
    }
  ],
  "payloadData": [
    {
      "name": "testMessage",
      "type": "STRING"
    }
  ]
}
Default XML format
<events>
    <event>
        <metaData>
            <ip>data4</ip>
        </metaData>
        <correlationData>
            <id>56783</id>
        </correlationData>
        <payloadData>
            <testMessage>data1</testMessage>
        </payloadData>
    </event>
</events>
Default JSON format
{
    "event": {
        "metaData": {
            "ip": "data4"
        },
        "correlationData": {
            "id": "545455"
        },
        "payloadData": {
            "testMessage": "data1"
        }
    }
}
Default text format
meta_ip:data1,
correlation_id:323232,
testMessage:data2
Default map format
KeyValue

meta_ip

data1

correlation_id

323232

testMessage

data2

Custom event formats

If you receive and publish events with a different format than the default format, you need to provide appropriate mappings for the system to interpret the events.

Custom formats for receiving events

For information on the custom event receiver mappings, see Input Mapping Types.

Custom formats for publishing events

For information on the custom event publisher mappings, see Output Mapping Types.

Event Flow

Event flow visualizes the stream flow in WSO2 CEP/DAS to easily navigate to different WSO2 CEP/DAS components. 

Follow the steps below to view the event flow.

  1. Log in to the management console.
  2. Click Main, and then click Event Flow in the Event Processor menu.

This demonstrates how all the active event receivers, event streams, event publishers, and execution plans are connected.

 


Analytics event table

In batch analytics, an event table is used to persist events from a stream, and to later lookup/update/delete from it. The Data Analytics Server contains an event table implementation based on its Data Access Layer, where users can create an event table based on the underlying configured data source of the server.

The analytics event table will follow the best approach in carrying out its tasks, e.g. using the primary keys if there are no non-equal conditional expressions in the queries etc.. or else, for conditions that require less than, greater than, less than or equal, greater than or equal, contains, then the respective fields must be marked as indexed.

The syntax in using the analytics event table is as follows:-

@from(eventtable = 'analytics.table' , table.name = <analytics_table_name>, primary.keys = <primary_keys>, indices = <indices>, wait.for.indexing = <wait_for_indexing_flag>, merge.schema = <merge_schema_flag>)
define table <EventTableName> (<schema>);
Field NameDescriptionRequiredDefault Value
table.nameThe name of the analytics table, this can be an existing table, or else, a new one will be created.Yes
primary.keys

The list of fields to be used as the primary keys of the table, this can be useful, if the lookup operations are done only using primary key values,

which is the most efficient to execute.

No
indices

The list of index fields separated by commas, each entry consists of the format "<index_column_name> -sp", where "-sp" is optional property to say,

if this index column should be treated as a score param.

No
wait.for.indexing

The indexing operations in the analytics tables happens asynchronously, if events coming from a specific stream changing the event table's data needs

to be finalized, setting this flag to 'true' would wait till the background indexing to finish and continue with the execution of the flow.

Notrue
merge.schema

In the case of an existing table given to the analytics event table, if this flag is set to 'true', the existing schema and the given schema will be merged together.

That is, the merging of columns and its indexing information, if set to 'false', the schema given by the analytics event table will overwrite the existing one.

Notrue
caching

Enables caching for the analytics table. This will store any looked up data from the analytics table and store the most recently accessed data,

with the given capacity and timeout restrictions of the cache.

Nofalse
cache.timeout.secondsThe timeout of the cache entries in secondsNo10
cache.size.bytesThe maximum capacity of the cache in bytesNo10485760
Sample
@from(eventtable = 'analytics.table' , table.name = 'stocks', primary.keys = 'symbol', indices = 'price, volume -sp', wait.for.indexing = 'true', merge.schema = 'false')
define table StockTable (symbol string, price float, volume long);
com.atlassian.confluence.content.render.xhtml.migration.exceptions.UnknownMacroMigrationException: The macro 'next_previous_links2' is unknown.