com.atlassian.confluence.content.render.xhtml.migration.exceptions.UnknownMacroMigrationException: The macro 'next_previous_link3' is unknown.

Upgrading from a Previous Release

This section provides information on how you can upgrade from DAS/CEP to Product SP. 


Overview

WSO2 DAS 3.1.0 is the predecessor of WSO2 SP 4.0.0. Similar to SP, DAS processed events via an event flow that consisted of event streams, receivers, publishers, and execution plans. These elements of the event flow are defined separate from each other via the DAS Management Console.

WSO2 SP defines the complete event flow within a single application created via a Siddhi file. The application is then deployed in a SP worker node and executed at runtime. Due to this architectural difference between the two products, configurations cannot be directly migrated from DAS to SP. Instead, they need to be recreated as explained in this section.

With WSO2 SP's streaming SQL capabilities and its  inbuilt editor that has event simulation and debugging support, it can help you to create real-time applications much faster than before. The WSO2 SP version  4.0.0  is a new product focusing on solving stream processing and complex event processing use cases. 

WSO2 SP uses a single Siddhi Streaming SQL file for script data collection, processing, and notification logic. The batch analytics aspect is handled via Siddhi aggregations.

For more information about the key capabilities of WSO2 SP, see About This Release.

Deployable Artifacts

Siddhi applications are the deployable artifact type of the Stream Processor.

To use Siddhi, you need to write the processing logic as a Siddhi application in the Siddhi Streaming SQL language.Once a Siddhi application is created and started, it does the following:

  1. Takes data one-by-one as events
  2. Processes the data per each event
  3. Generates new high level events based on the processing carried out up to the current time
  4. Sends newly generated events as the output to streams.



An element of Siddhi SQL can be composed together as a script in a Siddhi application. Here, each construct must be separated by a semicolon ( ; ) as shown in the syntax below.

<siddhi app>  : 
       <app annotation> * 
       ( <stream definition> | <table definition> | ... ) + 
       ( <query> | <partition> ) +
       ;

The following is a sample Siddhi application named Temperature-Analytics that includes a stream named TempStream and a query named 5minAvgQuery to process the events handled by it.

@app:name('Temperature-Analytics')

define stream TempStream (deviceID long, roomNo int, temp double);

@name('5minAvgQuery')
from TempStream#window.time(5 min)
select roomNo, avg(temp) as avgTemp
 group by roomNo
insert into OutputStream;
 
The following are Siddhi SQL element types in your DAS setup that you can redefine in an Siddhi application so that you can reuse them with WSO2 SP.

Event Streams -> Stream Definition

A stream such as  the TempStream stream unifies common types of events together. This enables them to be processed via queries using their defined attributes in a streaming manner, and allow sinks and sources to map events to/from various data formats.

When migrating event stream definitions in WSO2 DAS, you must rewrite them in the syntax followed in Siddhi applications as illustrated in the table below.

Configuration in WSO2 DAS
{  "streamId": "TempStream:1.0.0",
  "name": "TempStream",
  "version": "1.0.0",
  "metaData": [
    {
      "name": "ip",
      "type": "STRING"
    }
  ],
  "correlationData": [
    {
      "name": "id",
      "type": "LONG"
    }
  ],
  "payloadData": [
    {
      "name": "deviceID",
      "type": "LONG"
    },
    {
      "name": "deviceID",
      "type": "LONG"
    },

{      "name": "temp",
      "type": "DOUBLE"
    }

  ]
}
Configuration in Siddhi file
define stream TempStream (deviceID long, roomNo int, temp double);


Event Reciever -> Source

In WSO2 CEP/DAS, events are received by event receivers that manage the event retrieval process. In Siddhi files deployed in WSO2SP, you need to configure sources instead of event receivers to receive events.

To configure a stream that consumes events via a source, add the source configuration to a stream definition by adding the @source annotation with the required parameter values.

Configuration in WSO2 DAS
<eventReceiver name="test_event_receiver" statistics="disable"    trace="disable" xmlns="http://wso2.org/carbon/eventreceiver">
    <from eventAdapterType="event_type">
        <property name="static.option.key1">static_option_value1</property>
        <property name="static.option.keyN">static_option_valueN</property>
    </from>
    <mapping customMapping="disable" type="map_type"/>
    <to streamName="testEventStream" version="1.0.0"/>
</eventReceiver>
Configuration in Siddhi file
@source(type='source_type', static.option.key1='static_option_value1', static.option.keyN='static_option_valueN',
    @map(type='map_type', static.option_key1='static_option_value1', static.option.keyN='static_option_valueN',
        @attributes( attributeN='attribute_mapping_N', attribute1='attribute_mapping_1')
    )
)
define stream testEventStream (attribute1 Type1, attributeN TypeN);

The type parameter of @source defines the source type that receives events. The other parameters to be configured depends on the source type selected. Some of the the parameters are optional. 

For detailed information about the parameters see the documentation for the relevant source.

The following is the list of source types that are currently supported:

Event Publisher -> Sink

In WSO2 CEP/DAS, events are published via event publishers that manage the event publishing process. In Siddhi files deployed in WSO2SP, you need to configure sinks instead of event publishers to publish events.

To configure a stream that provides events via a sink, add the sink configuration to a stream definition by adding the @sink annotation with the required parameter values.

Configuration in WSO2 DAS
<eventPublisher name="httpLogger"
  statistics="disable" trace="disable" xmlns="http://wso2.org/carbon/eventpublisher">
  <from streamName="org.wso2.event.sensor.stream" version="1.0.0"/>
  <mapping customMapping="disable" type="text"/>
  <to eventAdapterType="logger">
    <property name="uniqueId">org.wso2.event.statistics.logger</property>
  </to>
</eventPublisher>

Configuration in Siddhi file
@Sink(type = 'log', @map(type = 'text'))
define stream sensorStream (sensorId int, temperature double)


For detailed information about the parameters see the documentation for the relevant sink.

The following is a list of currently supported sink types.

Sink and Source Mappers

In WSO2 CEP/DAS server  the supported default format for the Message Format property is configured under Mapping Configuration when creating event receivers. In the Siddhi files, this is replaced with the type parameter of the @map annotation that defines the map type to be used to map the data.

Each @source and @sink configuration has a mapping denoted by the @map annotation that converts the incoming messages format to Siddhi events.

Configuration in WSO2 DAS
<mapping customMapping="disable" type="map_type"/>
Configuration in Siddhi file
@map(type='map_type')

The other parameters to be configured depends on the mapper selected. Some of these parameters are optional. For detailed information about the parameters see the documentation for the relevant mapper.

The following is a list of currently supported source mapping types:

Map Attributes

@attributes is an optional annotation used with the @map annotation to define custom mapping that replaces <mapping customMapping> in the CEP/DAS server. When the @attributes annotation is not provided, each mapper assumes that the incoming events adhere to its own default data format. By adding the @attributes annotation, you can configure mappers to extract data from the incoming message selectively, and assign them to attributes.

There are two ways you can configure map attributes.

  1. Defining attributes as keys and mapping content as values in the following format.
    @attributes( attributeN='mapping_N', attribute1='mapping_1')
  2. Defining the mapping content of all attributes in the same order as how the attributes are defined in stream definition.
    @attributes( 'mapping_1', 'mapping_N')

Example

This query receives events via the HTTP source in the JSON data format, and directs them to the InputStream stream for processing. Here the HTTP source is configured to receive events on all network interfaces at the 8080 port and on the foo context. The source is also secured via basic authentication.

Configuration in WSO2 DAS
<eventReceiver ... xmlns="http://wso2.org/carbon/eventreceiver">    <from ... />
    <mapping customMapping="enable" type="json">
        <from streamName="sensor.stream" version="1.0.6"/>
        <property>
            <from dataType="meta" name="time"/>
            <to name="meta_timestamp" type="long"/>
        </property>
        <property>
            <from dataType="meta" name="meta_ispowerServed"/>
            <to name="meta_isPowerSaverEnabled" type="bool"/>
        </property>
        <property>
            <from dataType="meta" name="id"/>
            <to name="meta_sensorId" type="int"/>
        </property>
    </mapping>
    <to ... />
</eventReceiver>
Configuration in Siddhi file
@source(type='http', receiver.url='http://0.0.0.0:8080/foo', is.basic.auth.enabled='true', 
  @map(type='json'))
define stream InputStream (name string, age int, country string);


Event Store -> Table

In CEP/DAS, event streams are persisted by creating a corresponding table in the WSO2 Data Access Layer for batch analysis. In WSO2 Stream Processor, this functionality is replaced by Table which is a stored version of an stream or a table of events. Its schema is defined via the table definition that is similar to a stream definition.

These events are by default stored  in-memory , but Siddhi also provides store extensions to work with data/events stored in various data stores through the table abstraction. 

Tables allow Siddhi to work with stored events. By defining a schema for tables, Siddhi allows them to be processed by queries using their defined attributes with the streaming data. You can also interactively query the state of the stored events in the table.


@PrimaryKey('symbol')
define table StockTable (symbol string, price float, volume long);

Indexes

Indexes allow tables to be searched/modified much faster.

Indexes are configured by including the @Index( 'key1', 'key2' ) annotation to the table definition. Each event table configuration can have 0-1 @Index annotations. Support for the @Index annotation and the number of attributes supported differ based on the table implementations. When more then one attribute is used for index, each one of them is used to index the table for fast access of data. Indexes can be configured together with primary keys.

Example

This query creates an indexed event table named RoomTypeTable with the roomNo attribute as the index key.

@Index('roomNo')
define table RoomTypeTable (roomNo int, type string);
      

Execution Plan -> Queries

Queries used in DAS/CEP execution plans and Stream Processor are almost same. There are a few newly introduced features for Siddhi 4.0.0 that are used with Stream Processor 4.0.0. These features are listed below.

  • I ncremental Aggregation

    Incremental aggregation allows user to retrieve the aggregate value for different time durations. That is, it allows user to obtain aggregates such as sum, count, avg, min, max, and count) of stream attributes for durations such as sec, min, hour, etc.

    Following is an example query.

    define stream TradeStream (symbol string, price double, volume long, timestamp long);
    
    define aggregation TradeAggregation
      from TradeStream
      select symbol, avg(price) as avgPrice, sum(price) as total
        group by symbol
        aggregate by timestamp every sec ... year;
  • Set key word

    Set keyword allows you to update selected attributes from the table.

    Here, for each assignment, the attribute specified in the left must be the table attribute, and the one specified in the right can be a stream/table attribute a mathematical operation, or other. 

    When the set clause is not provided, all the attributes in the table are updated. This works with the update and update or insert operations.

    The following is a sample query.

    FROM fooStream
    SELECT roomNo, time: timestampInMilliseconds () as ts
    UPDATE barTable
        SET barTable.timestamp = ts
        ON  barTable.room_no == roomNo AND roomNo > 2
  • Pattern to identify non-occurence of events

    Patterns and sequences are the key features of a complex event processor to define a new complex event based on the order of one or more raw events. A pattern can be an atomic pattern that detects the arrival of a specific event, or a complex pattern that detects the arrival of more events in a defined order. Although patterns generally define complex events based on the order of events that arrive, sometimes a complex event may depend on an event that should not arrive.

    Usually, Siddhi pattern processors wait for the events until they arrive. Once an event arrives, the pattern processor starts looking for the next event. When detecting events that have not arrived, the pattern processor must not wait for an infinite time period to declare the non-arrival of the event. Therefore, a time interval to wait for the event must be defined with absent pattern operators with an exception to the logical AND pattern combining an event that is expected to arrive, and an event that must not arrive beforehand.

    Following is a sample query.

    from TemperatureStream[temp > 60] -> not FireAlarmStream[active == true] for 5 sec
    select 'Fire alarm not working' as message
    insert into AlertStream;
  • Defined Window

    A defined window is a window that can be shared across multiple queries. 

    Events can be inserted to a defined window from one or more queries and it can produce output events based on the defined window type.

    The following is a sample query.

    define stream TempStream(tempId string, temp double);
    define window OneMinTempWindow(tempId string, temp double) time(1 min);
    
    from TempStream
    select *
    insert into OneMinTempWindow;

Upgrading the database

WSO2 SP stores product-specific data in H2 databases by default. Those databases are located in the <PRODUCT_HOME>/wso2/<Profile>/database directory.

This embedded H2 database is suitable for development, testing, and for some production environments. However, we recommend that you use an industry-standard RDBMS such as Oracle, PostgreSQL, MySQL, MS SQL, etc., because they are more suitable for most production environments. Most table schemas are self generated by the feature itself. For the other table schemas you can use the scripts provided with WSO2 SP in the <SP_HOME>/wso2/<Profile>/dbscripts directory to install and configure databases. This directory includes scripts for Oracle, PostgreSQL, MySQL and MS SQL. 

CEP database migration is direct as it only uses RDBMS tables without any properietary encoding.

DAS supports data persistance in two types of databases.

  1. RDBMS Event tables

  2. Analytics Tables (which also include Analytics Event Tables)

RDBMS Event table migration is straightforward because WSO2 SP suppports RDBMS. 

Analytics tables migration is an indirect process where we need to convert the Analytics Tables into RDBMS tables by running Spark scripts using the CarbonJDBC provider packed with DAS.

The following is an sample query that can be run on DAS to migrates the ORG_WSO2_DAS_SAMPLE_SMART_HOME_DATA event table into  an RDBMS table.

CREATE TEMPORARY TABLE SMART_HOME_DATA
USING CarbonAnalytics OPTIONS (tableName "ORG_WSO2_DAS_SAMPLE_SMART_HOME_DATA", schema "house_id INT, metro_area STRING, state STRING, device_id INT, power_reading FLOAT, is_peak BOOLEAN");

CREATE TEMPORARY TABLE SMART_HOME_DATA_RDBMS using CarbonJDBC OPTIONS (dataSource "WSO2_SP_MIGRATION_DATA_STORE_DB", tableName "ORG_WSO2_DAS_SAMPLE_SMART_HOME_DATA", schema "house_id INTEGER -i, 
metro_area STRING, state STRING, device_id INTEGER, power_reading FLOAT, is_peak BOOLEAN");


INSERT INTO TABLE SMART_HOME_DATA_RDBMS SELECT * FROM SMART_HOME_DATA;                                           



com.atlassian.confluence.content.render.xhtml.migration.exceptions.UnknownMacroMigrationException: The macro 'next_previous_links2' is unknown.