Upgrading from a Previous Release
This section provides information on how you can upgrade from DAS/CEP to Product SP.
Overview
WSO2 DAS 3.1.0 is the predecessor of WSO2 SP 4.0.0. Similar to SP, DAS processed events via an event flow that consisted of event streams, receivers, publishers, and execution plans. These elements of the event flow are defined separate from each other via the DAS Management Console.
WSO2 SP defines the complete event flow within a single application created via a Siddhi file. The application is then deployed in a SP worker node and executed at runtime. Due to this architectural difference between the two products, configurations cannot be directly migrated from DAS to SP. Instead, they need to be recreated as explained in this section.
With WSO2 SP's streaming SQL capabilities and its inbuilt editor that has event simulation and debugging support, it can help you to create real-time applications much faster than before. The WSO2 SP version 4.0.0 is a new product focusing on solving stream processing and complex event processing use cases.
WSO2 SP uses a single Siddhi Streaming SQL file for script data collection, processing, and notification logic. The batch analytics aspect is handled via Siddhi aggregations.
For more information about the key capabilities of WSO2 SP, see About This Release.
Deployable Artifacts
Siddhi applications are the deployable artifact type of the Stream Processor.
To use Siddhi, you need to write the processing logic as a Siddhi application in the Siddhi Streaming SQL language.Once a Siddhi application is created and started, it does the following:
- Takes data one-by-one as events
- Processes the data per each event
- Generates new high level events based on the processing carried out up to the current time
- Sends newly generated events as the output to streams.
An element of Siddhi SQL can be composed together as a script in a Siddhi application. Here, each construct must be separated by a semicolon ( ; ) as shown in the syntax below.
<siddhi app> : <app annotation> * ( <stream definition> | <table definition> | ... ) + ( <query> | <partition> ) + ;
The following is a sample Siddhi application named Temperature-Analytics
that includes a stream named TempStream
and a query named 5minAvgQuery
to process the events handled by it.
@app:name('Temperature-Analytics') define stream TempStream (deviceID long, roomNo int, temp double); @name('5minAvgQuery') from TempStream#window.time(5 min) select roomNo, avg(temp) as avgTemp group by roomNo insert into OutputStream;
Event Streams -> Stream Definition
A stream such as the TempStream
stream unifies common types of events together. This enables them to be processed via queries using their defined attributes in a streaming manner, and allow sinks and sources to map events to/from various data formats.
When migrating event stream definitions in WSO2 DAS, you must rewrite them in the syntax followed in Siddhi applications as illustrated in the table below.
Configuration in WSO2 DAS | { "streamId": "TempStream:1.0.0", "name": "TempStream", "version": "1.0.0", "metaData": [ { "name": "ip", "type": "STRING" } ], "correlationData": [ { "name": "id", "type": "LONG" } ], "payloadData": [ { "name": "deviceID", "type": "LONG" }, { "name": "deviceID", "type": "LONG" }, { "name": "temp", "type": "DOUBLE" } ] } |
---|---|
Configuration in Siddhi file | define stream TempStream (deviceID long, roomNo int, temp double); |
Event Reciever -> Source
In WSO2 CEP/DAS, events are received by event receivers that manage the event retrieval process. In Siddhi files deployed in WSO2SP, you need to configure sources instead of event receivers to receive events.
To configure a stream that consumes events via a source, add the source configuration to a stream definition by adding the @source
annotation with the required parameter values.
Configuration in WSO2 DAS | <eventReceiver name="test_event_receiver" statistics="disable" trace="disable" xmlns="http://wso2.org/carbon/eventreceiver"> <from eventAdapterType="event_type"> <property name="static.option.key1">static_option_value1</property> <property name="static.option.keyN">static_option_valueN</property> </from> <mapping customMapping="disable" type="map_type"/> <to streamName="testEventStream" version="1.0.0"/> </eventReceiver> |
---|---|
Configuration in Siddhi file | @source(type='source_type', static.option.key1='static_option_value1', static.option.keyN='static_option_valueN', @map(type='map_type', static.option_key1='static_option_value1', static.option.keyN='static_option_valueN', @attributes( attributeN='attribute_mapping_N', attribute1='attribute_mapping_1') ) ) define stream testEventStream (attribute1 Type1, attributeN TypeN); |
The type
parameter of @source
defines the source type that receives events. The other parameters to be configured depends on the source type selected. Some of the the parameters are optional.
For detailed information about the parameters see the documentation for the relevant source.
The following is the list of source types that are currently supported:
Event Publisher -> Sink
In WSO2 CEP/DAS, events are published via event publishers that manage the event publishing process. In Siddhi files deployed in WSO2SP, you need to configure sinks instead of event publishers to publish events.
To configure a stream that provides events via a sink, add the sink configuration to a stream definition by adding the @sink
annotation with the required parameter values.
Configuration in WSO2 DAS | <eventPublisher name="httpLogger" statistics="disable" trace="disable" xmlns="http://wso2.org/carbon/eventpublisher"> <from streamName="org.wso2.event.sensor.stream" version="1.0.0"/> <mapping customMapping="disable" type="text"/> <to eventAdapterType="logger"> <property name="uniqueId">org.wso2.event.statistics.logger</property> </to> </eventPublisher> |
---|---|
Configuration in Siddhi file | @Sink(type = 'log', @map(type = 'text')) define stream sensorStream (sensorId int, temperature double) |
For detailed information about the parameters see the documentation for the relevant sink.
The following is a list of currently supported sink types.
Sink and Source Mappers
In WSO2 CEP/DAS server the supported default format for the Message Format property is configured under Mapping Configuration when creating event receivers. In the Siddhi files, this is replaced with the type
parameter of the @map
annotation that defines the map type to be used to map the data.
Each @source
and @sink
configuration has a mapping denoted by the @map
annotation that converts the incoming messages format to Siddhi events.
Configuration in WSO2 DAS | <mapping customMapping="disable" type="map_type"/> |
---|---|
Configuration in Siddhi file | @map(type='map_type') |
The other parameters to be configured depends on the mapper selected. Some of these parameters are optional. For detailed information about the parameters see the documentation for the relevant mapper.
The following is a list of currently supported source mapping types:
Map Attributes
@attributes
is an optional annotation used with the @map
annotation to define custom mapping that replaces <mapping customMapping>
in the CEP/DAS server. When the @attributes
annotation is not provided, each mapper assumes that the incoming events adhere to its own default data format. By adding the @attributes
annotation, you can configure mappers to extract data from the incoming message selectively, and assign them to attributes.
There are two ways you can configure map attributes.
- Defining attributes as keys and mapping content as values in the following format.
@attributes( attributeN='mapping_N', attribute1='mapping_1')
- Defining the mapping content of all attributes in the same order as how the attributes are defined in stream definition.
@attributes( 'mapping_1', 'mapping_N')
Example
This query receives events via the HTTP
source in the JSON
data format, and directs them to the InputStream
stream for processing. Here the HTTP
source is configured to receive events on all network interfaces at the 8080
port and on the foo context. The source is also secured via basic authentication.
Configuration in WSO2 DAS | <eventReceiver ... xmlns="http://wso2.org/carbon/eventreceiver"> <from ... /> <mapping customMapping="enable" type="json"> <from streamName="sensor.stream" version="1.0.6"/> <property> <from dataType="meta" name="time"/> <to name="meta_timestamp" type="long"/> </property> <property> <from dataType="meta" name="meta_ispowerServed"/> <to name="meta_isPowerSaverEnabled" type="bool"/> </property> <property> <from dataType="meta" name="id"/> <to name="meta_sensorId" type="int"/> </property> </mapping> <to ... /> </eventReceiver> |
---|---|
Configuration in Siddhi file | @source(type='http', receiver.url='http://0.0.0.0:8080/foo', is.basic.auth.enabled='true', @map(type='json')) define stream InputStream (name string, age int, country string); |
Event Store -> Table
In CEP/DAS, event streams are persisted by creating a corresponding table in the WSO2 Data Access Layer for batch analysis. In WSO2 Stream Processor, this functionality is replaced by Tabl
e which is a stored version of an stream or a table of events. Its schema is defined via the table definition that is similar to a stream definition.
These events are by default stored in-memory
, but Siddhi also provides store extensions to work with data/events stored in various data stores through the table abstraction.
Tables allow Siddhi to work with stored events. By defining a schema for tables, Siddhi allows them to be processed by queries using their defined attributes with the streaming data. You can also interactively query the state of the stored events in the table.
@PrimaryKey('symbol') define table StockTable (symbol string, price float, volume long);
Indexes
Indexes allow tables to be searched/modified much faster.
Indexes are configured by including the @Index( 'key1', 'key2' )
annotation to the table definition. Each event table configuration can have 0-1 @Index
annotations. Support for the @Index
annotation and the number of attributes supported differ based on the table implementations. When more then one attribute is used for index, each one of them is used to index the table for fast access of data. Indexes can be configured together with primary keys.
Example
This query creates an indexed event table named RoomTypeTable
with the roomNo
attribute as the index key.
@Index('roomNo') define table RoomTypeTable (roomNo int, type string);
Execution Plan -> Queries
Queries used in DAS/CEP execution plans and Stream Processor are almost same. There are a few newly introduced features for Siddhi 4.0.0 that are used with Stream Processor 4.0.0. These features are listed below.
I ncremental Aggregation
Incremental aggregation allows user to retrieve the aggregate value for different time durations. That is, it allows user to obtain aggregates such as
sum
,count
,avg
,min
,max
, andcount
) of stream attributes for durations such assec
,min
,hour
, etc.
Following is an example query.define stream TradeStream (symbol string, price double, volume long, timestamp long); define aggregation TradeAggregation from TradeStream select symbol, avg(price) as avgPrice, sum(price) as total group by symbol aggregate by timestamp every sec ... year;
Set key word
Set keyword
allows you to update selected attributes from the table.Here, for each assignment, the attribute specified in the left must be the table attribute, and the one specified in the right can be a stream/table attribute a mathematical operation, or other.
When the set clause is not provided, all the attributes in the table are updated. This works with the
update
andupdate or insert
operations.
The following is a sample query.
FROM fooStream SELECT roomNo, time: timestampInMilliseconds () as ts UPDATE barTable SET barTable.timestamp = ts ON barTable.room_no == roomNo AND roomNo > 2
Pattern to identify non-occurence of events
Patterns and sequences are the key features of a complex event processor to define a new complex event based on the order of one or more raw events. A pattern can be an atomic pattern that detects the arrival of a specific event, or a complex pattern that detects the arrival of more events in a defined order. Although patterns generally define complex events based on the order of events that arrive, sometimes a complex event may depend on an event that should not arrive.
Usually, Siddhi pattern processors wait for the events until they arrive. Once an event arrives, the pattern processor starts looking for the next event. When detecting events that have not arrived, the pattern processor must not wait for an infinite time period to declare the non-arrival of the event. Therefore, a time interval to wait for the event must be defined with absent pattern operators with an exception to the logical
AND
pattern combining an event that is expected to arrive, and an event that must not arrive beforehand.Following is a sample query.
from TemperatureStream[temp > 60] -> not FireAlarmStream[active == true] for 5 sec select 'Fire alarm not working' as message insert into AlertStream;
Defined Window
A defined window is a window that can be shared across multiple queries.
Events can be inserted to a defined window from one or more queries and it can produce output events based on the defined window type.
The following is a sample query.
define stream TempStream(tempId string, temp double); define window OneMinTempWindow(tempId string, temp double) time(1 min); from TempStream select * insert into OneMinTempWindow;
Upgrading the database
WSO2 SP stores product-specific data in H2 databases by default. Those databases are located in the <PRODUCT_HOME>/wso2/<Profile>/database
directory.
This embedded H2 database is suitable for development, testing, and for some production environments. However, we recommend that you use an industry-standard RDBMS such as Oracle, PostgreSQL, MySQL, MS SQL, etc., because they are more suitable for most production environments. Most table schemas are self generated by the feature itself. For the other table schemas you can use the scripts provided with WSO2 SP in the <SP_HOME>/wso2/<Profile>/dbscripts
directory to install and configure databases. This directory includes scripts for Oracle, PostgreSQL, MySQL and MS SQL.
CEP database migration is direct as it only uses RDBMS tables without any properietary encoding.
DAS supports data persistance in two types of databases.
RDBMS Event tables
Analytics Tables (which also include Analytics Event Tables)
Analytics tables migration is an indirect process where we need to convert the Analytics Tables into RDBMS tables by running Spark scripts using the CarbonJDBC provider packed with DAS.
The following is an sample query that can be run on DAS to migrates the ORG_WSO2_DAS_SAMPLE_SMART_HOME_DATA
event table into an RDBMS table.
CREATE TEMPORARY TABLE SMART_HOME_DATA USING CarbonAnalytics OPTIONS (tableName "ORG_WSO2_DAS_SAMPLE_SMART_HOME_DATA", schema "house_id INT, metro_area STRING, state STRING, device_id INT, power_reading FLOAT, is_peak BOOLEAN"); CREATE TEMPORARY TABLE SMART_HOME_DATA_RDBMS using CarbonJDBC OPTIONS (dataSource "WSO2_SP_MIGRATION_DATA_STORE_DB", tableName "ORG_WSO2_DAS_SAMPLE_SMART_HOME_DATA", schema "house_id INTEGER -i, metro_area STRING, state STRING, device_id INTEGER, power_reading FLOAT, is_peak BOOLEAN"); INSERT INTO TABLE SMART_HOME_DATA_RDBMS SELECT * FROM SMART_HOME_DATA;