Understanding Event Streams and Event Tables
Events are the lifeline of WSO2 CEP/DAS. They not only process data as events, but also interact with external systems using events. Event is a unit of data, and an event stream is a sequence of events of a particular type. The type of events can be defined as an event stream definition. The following sections explain how to work with events in WSO2 DAS.
Event streams
You can manage event streams through event stream definitions.
- Event stream definition
- Adding an event stream
- Using the source view
- Deleting an event stream
- Editing an event stream
- Creating sample events
Event stream definition
Definitions of the event streams are stored in the filesystem as deployable artifacts in the
<PRODUCT
_HOME>/repository/deployment/server/eventstreams/
directory as .json files. These are hot deployable files and can be added/removed when the server is up and running. A sample event stream definition is as follows.
{ "streamId": "org.wso2.test:1.0.0", "name": "org.wso2.test", "version": "1.0.0", "nickName": "TestStream", "description": "Test Stream", "metaData": [ { "name": "ip", "type": "STRING" } ], "correlationData": [ { "name": "id", "type": "LONG" } ], "payloadData": [ { "name": "testMessage", "type": "STRING" } ] }
The properties of the above event stream definition are described below.
Property | Description | ||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Event Stream Name | Name of the event stream. | ||||||||||||
Event Stream Version | Version of the event stream. (Default value is 1.0.0.) | ||||||||||||
Event Stream Description | Description of the events stream. (This is optional.) | ||||||||||||
Event Stream Nick-Name | Nick-names of an event streams separated by commas.(This is optional.) | ||||||||||||
Stream Attributes | Stream attributes contains the data of the event. Data is divided into the following 3 logical categories for maintenance and usability. It is not required to have attributes for all 3 categories, but there should be at least one category with at least one attribute defined. The attribute names should be unique within each category.
e.g., The following attributes exist in a single event.
These attributes can be logically categorized as follows.
|
Adding an event stream
You can create an event stream by creating a new event stream definition using the design view or the source view.
Using the design view
Follow the steps below to add an event stream using the design view.
- Log in to the management console, and click Main.
- Click Event Streams in the Event Processor menu, and then click Add Event Stream.
- Enter details of the stream definition that you want to create as shown in the below example.
- Click Add Event Stream, to create the Event Stream in the system. When you click OK in the pop-up message on successful addition of the stream definition, you view it in the Available Event Streams list as shown below.
Using the source view
Follow the steps below to add an event stream using the source view.
- Log in to the management console, and click Main.
- Click Event Streams in the Event Processor menu, and then click Add Event Stream.
Click switch to source view.
Click switch to design view to add the event stream using the design view.
- Enter details of the stream definition that you want to create as shown in the below example.
- Click Add Event Stream, to create the event stream in the system. You can view the new event stream in the Available Event Streams list as shown below.
Deleting an event stream
Follow the steps below to delete an event stream by deleting the corresponding event stream definition.
- Log in to the management console, and click Main .
- Click Event Streams in the Event Processor menu. You view the Available Event Streams list.
- Click the Delete button of the corresponding event stream to delete it.
Editing an event stream
Follow the steps below to edit an event stream by editing the corresponding event stream definition.
- Log in to the management console, and click Main.
- Click Event Streams in the Event Processor menu. You view the Available Event Streams list.
Click the Edit button of the corresponding event stream to edit it.
Creating sample events
Follow the steps below to create sample events for a defined event stream.
- Log in to the management console, and click Main.
- Click Event Streams in the Event Processor menu. You view the Available Event Streams list.
- Click the Event Stream Id of the corresponding event stream for which you want to create the sample event.
- Select the event format type (i.e. xml, json, or text) from the drop down list, which you want to create the sample event in.
- You view details of the event stream as shown below.
- Click Generate Event to create the sample event.
Event formats
WSO2 CEP/DAS facilitates the following default and custom event formats.Default event formats
By default, WSO2 CEP/DAS represents an event as a WSO2Event object. Furthermore, WSO2 CEP/DAS supports events in XML, JSON, Text and Map formats. The default event formats of the XML, JSON, Text and Map representations for the following sample event stream definition are as follows.
Sample event stream definition
{ "streamId": "org.wso2.test:1.0.0", "name": "org.wso2.test", "version": "1.0.0", "nickName": "TestStream", "description": "Test Stream", "metaData": [ { "name": "ip", "type": "STRING" } ], "correlationData": [ { "name": "id", "type": "LONG" } ], "payloadData": [ { "name": "testMessage", "type": "STRING" } ] }
Default XML format
<events> <event> <metaData> <ip>data4</ip> </metaData> <correlationData> <id>56783</id> </correlationData> <payloadData> <testMessage>data1</testMessage> </payloadData> </event> </events>
Default JSON format
{ "event": { "metaData": { "ip": "data4" }, "correlationData": { "id": "545455" }, "payloadData": { "testMessage": "data1" } } }
Default text format
meta_ip:data1, correlation_id:323232, testMessage:data2
Default map format
Key | Value |
---|---|
meta_ip | data1 |
correlation_id | 323232 |
testMessage | data2 |
Custom event formats
If you receive and publish events with a different format than the default format, you need to provide appropriate mappings for the system to interpret the events.
Custom formats for receiving events
For information on the custom event receiver mappings, see Input Mapping Types.
Custom formats for publishing events
For information on the custom event publisher mappings, see Output Mapping Types.
Event Flow
Event flow visualizes the stream flow in WSO2 CEP/DAS to easily navigate to different WSO2 CEP/DAS components.
Follow the steps below to view the event flow.
- Log in to the management console.
- Click Main, and then click Event Flow in the Event Processor menu.
This demonstrates how all the active event receivers, event streams, event publishers, and execution plans are connected.
Analytics event table
In batch analytics, an event table is used to persist events from a stream, and to later lookup/update/delete from it. The Data Analytics Server contains an event table implementation based on its Data Access Layer, where users can create an event table based on the underlying configured data source of the server.
The analytics event table will follow the best approach in carrying out its tasks, e.g. using the primary keys if there are no non-equal conditional expressions in the queries etc.. or else, for conditions that require less than, greater than, less than or equal, greater than or equal, contains, then the respective fields must be marked as indexed.
The syntax in using the analytics event table is as follows:-
@from(eventtable = 'analytics.table' , table.name = <analytics_table_name>, primary.keys = <primary_keys>, indices = <indices>, wait.for.indexing = <wait_for_indexing_flag>, merge.schema = <merge_schema_flag>) define table <EventTableName> (<schema>);
Field Name | Description | Required | Default Value |
---|---|---|---|
table.name | The name of the analytics table, this can be an existing table, or else, a new one will be created. | Yes | |
primary.keys | The list of fields to be used as the primary keys of the table, this can be useful, if the lookup operations are done only using primary key values, which is the most efficient to execute. | No | |
indices | The list of index fields separated by commas, each entry consists of the format "<index_column_name> -sp", where "-sp" is optional property to say, if this index column should be treated as a score param. | No | |
wait.for.indexing | The indexing operations in the analytics tables happens asynchronously, if events coming from a specific stream changing the event table's data needs to be finalized, setting this flag to 'true' would wait till the background indexing to finish and continue with the execution of the flow. | No | true |
merge.schema | In the case of an existing table given to the analytics event table, if this flag is set to 'true', the existing schema and the given schema will be merged together. That is, the merging of columns and its indexing information, if set to 'false', the schema given by the analytics event table will overwrite the existing one. | No | true |
caching | Enables caching for the analytics table. This will store any looked up data from the analytics table and store the most recently accessed data, with the given capacity and timeout restrictions of the cache. | No | false |
cache.timeout.seconds | The timeout of the cache entries in seconds | No | 10 |
cache.size.bytes | The maximum capacity of the cache in bytes | No | 10485760 |
@from(eventtable = 'analytics.table' , table.name = 'stocks', primary.keys = 'symbol', indices = 'price, volume -sp', wait.for.indexing = 'true', merge.schema = 'false') define table StockTable (symbol string, price float, volume long);