Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents
maxLevel3

...

This sample demonstrates how you can send notifications through events published from WSO2 DAS using Apache Spark. The notifications are sent to alert about records of an existing table in the Spark environment satisfying a defined condition(s). This sample includes involves creating a table with a few product names and quantities in the DAL, and sending notifications when the quantity of a product falls below a defined value.  

...

  1. Log in to the DAS management console using the following URL: https://<DAS_HOST>:<DAS_PORT>/carbon/
  2. Click Main, and then click Streams.
  3. Click Add Event Stream.
  4. Enter the values as shown below to create an event stream named PRODUCTS_STREAM with two attributes as product name and quantity. For more information on creating event streams, see Understanding Event Streams and Event Tables.
    create the receiving event stream
  5. Click Next (Persist Event).
  6. Enter the values as shown below in the next screen to persist the created event stream. For  For more information on creating event streams, see  Persisting  Event Streams .
    persisting the created event stream
  7. Click Save Event Stream.

Sending events to the receiving event stream

...

  1. Log in to the DAS management console using the following URL, if you are not already logged in: https://<DAS_HOST>:<DAS_PORT>/carbon/
  2. Click Tools, and then click Event Simulator.
  3. Upload the events to be sent to it in a CSV file (e.g. footwear.csv), and click Configure as shown below.
    simulating of sending events to the receiving event stream
  4. Enter the details as shown below, and click Configure. 
    entering the event mapping configurations
  5. Click Play in the next screen as shown below.
    play events to simulate the sending of events
  6. Click Main, and then click Message Console and select Data Explorer. Select the name of the table to view it as shown below.
    view the products master table in the message consoleImage Removed Image Added

Creating the corresponding table in Apache Spark

...

  1. Log in to the DAS management console using the following URL, if you are not already logged in: https://<DAS_HOST>:<DAS_PORT>/carbon/
  2. Click Main, and then click Console.
  3. Enter the following Spark SQL query in the Spark console, and press Enter key to execute it.

    Code Block
    languagesql
    CREATE TEMPORARY TABLE PRODUCTS_MASTER 
    USING CarbonAnalytics 
    OPTIONS (tableName "PRODUCTS_STREAM", 
             schema "product-name STRING,quantity INT");

Creating the event stream to publish event from Spark

For publishing events from Spark, you have to define an event stream with given stream attributes that will be published from Spark.

  1. Log in to the DAS management console using the following URL: https://<DAS_HOST>:<DAS_PORT>/carbon/
  2. Click Main, and then click Streams.
  3. Click Add Event Stream.
  4. Enter the values as shown below to create an event stream named PRODUCT_ALERTS_STREAM with two attributes as product name and quantity. For more information on creating event streams, see Event Streams.
  5. Image AddedClick Add Event Stream.

Creating the event receiver for event stream

Once you define the event stream, you need to create an event receiver of the WSO2Event type to receive events from Spark. For more information, see WSO2Event Event Receiver.

  1. Log in to the DAS management console using the following URL: https://<DAS_HOST>:<DAS_PORT>/carbon/
  2. Click Main, and then click Receivers.
  3. Click Add Event Receiver.
  4. Enter the values as shown below to create an event receiver of the WSO2Event type for above PRODUCT_ALERTS_STREAM. For more information on creating event streams, see Event Streams.
  5. Image AddedClick Add Event Receiver

Configuring a publisher

You can attach an event publisher such as email or JMS to the PRODUCT_ALERTS_STREAM, and get the events delivered to a preferred location. Follow the steps below to configure a publisher to publish output events from WSO2 DAS, and to send notifications as logs of the terminal using a logger publisher. For more information on configuring event publishers, see Creating Alerts.

...

Follow the steps below to execute the sample.

Creating the Spark table which maps to the created event stream

...


 Follow the steps below to create the publishing event stream named PRODUCT_ALERTS_STREAM, and the corresponding virtual table named PRODUCT_ALERTS in the Apache Spark environment within WSO2 DAS to maps with the created event stream PRODUCT_ALERTS_STREAM.

  1.   Log in to the DAS management console using the following URL, if you are not already logged in: https://<DAS_HOST>:<DAS_PORT>/carbon/
  2. Click Main, and then click Console.
  3. Enter the following Spark SQL query in the Spark console, and press Enter key to execute it.

    Code Block
    languagesql
    CREATE TEMPORARY TABLE PRODUCT_ALERTS 
    USING org.wso2.carbon.analytics.spark.event.EventStreamProvider 
    OPTIONS (receiverURL "tcp://<DAS_HOST>:<DAS_THRIFT_PORT>",
             username "<USERNAME>",
             password "<PASSWORD>",
             streamName "PRODUCT_ALERTS_STREAM",  
             version "1.0.0",
    
             description "Events are published  when product quantity goes beyond a certain level",
    
             nickName "product alerts",
              payload "product-name STRING,quantity INT"
    );
Note

Please provide the appropriate values for DAS_HOST, DAS_THRIFT_PORT, USERNAME and PASSWORD. Default values are, localhost, 7611, admin and admin.

Publishing events to the created event stream

...

When this query executes, output of the select query is inserted into the PRODUCT_ALERTS table. It  reads all the products from the  PRODUCTS_STREAM Spark table which have its quantity less than 50. During the query execution, individual rows returned from the select query are published into the  PRODUCT_ALERTS stream as events.

  1. You view the text events that are published to the CEP server in the logs of it in the CLI as shown below.
    output logs of the logger publisherImage Modified