Sending Notifications Through Published Events Using Spark
Introduction
This sample demonstrates how you can send notifications through events published from WSO2 DAS using Apache Spark. The notifications are sent to alert about records of an existing table in the Spark environment satisfying a defined condition(s). This sample involves creating a table with a few product names and quantities in the DAL, and sending notifications when the quantity of a product falls below a defined value.
Prerequisites
Set up the general prerequisites required for WSO2 DAS .
Building the sample
Follow the steps below to build the sample.
Creating the receiving event stream and the table in DAL
Follow the steps below to create a table named PRODUCTS together with the receiving event stream in the Data Access Layer (DAL).
- Log in to the DAS management console using the following URL: https://<DAS_HOST>:<DAS_PORT>/carbon/
- Click Main, and then click Streams.
- Click Add Event Stream.
- Enter the values as shown below to create an event stream named PRODUCTS_STREAM with two attributes as product name and quantity. For more information on creating event streams, see Understanding Event Streams and Event Tables.
- Click Next (Persist Event).
- Enter the values as shown below in the next screen to persist the created event stream. For more information on creating event streams, see Persisting Event Streams.
- Click Save Event Stream.
Sending events to the receiving event stream
Follow the steps below to simulate the sending of events to the created receiving event stream.
- Log in to the DAS management console using the following URL, if you are not already logged in: https://<DAS_HOST>:<DAS_PORT>/carbon/
- Click Tools, and then click Event Simulator.
- Upload the events to be sent to it in a CSV file (e.g. footwear.csv), and click Configure as shown below.
- Enter the details as shown below, and click Configure.
- Click Play in the next screen as shown below.
- Click Main, and then click Data Explorer. Select the name of the table to view it as shown below.
Creating the corresponding table in Apache Spark
Follow the steps below to create a virtual table in the Apache Spark environment within WSO2 DAS to map the PRODUCTS_STREAM table in the DAL.
- Log in to the DAS management console using the following URL, if you are not already logged in: https://<DAS_HOST>:<DAS_PORT>/carbon/
- Click Main, and then click Console.
Enter the following Spark SQL query in the Spark console, and press Enter key to execute it.
CREATE TEMPORARY TABLE PRODUCTS_MASTER USING CarbonAnalytics OPTIONS (tableName "PRODUCTS_STREAM", schema "product-name STRING,quantity INT");
Creating the event stream to publish event from Spark
For publishing events from Spark, you have to define an event stream with given stream attributes that will be published from Spark.
- Log in to the DAS management console using the following URL: https://<DAS_HOST>:<DAS_PORT>/carbon/
- Click Main, and then click Streams.
- Click Add Event Stream.
- Enter the values as shown below to create an event stream named PRODUCT_ALERTS_STREAM with two attributes as product name and quantity. For more information on creating event streams, see Event Streams .
- Click Add Event Stream.
Creating the event receiver for event stream
Once you define the event stream, you need to create an event receiver of the WSO2Event type to receive events from Spark. For more information, see WSO2Event Event Receiver.
- Log in to the DAS management console using the following URL: https://<DAS_HOST>:<DAS_PORT>/carbon/
- Click Main, and then click Receivers.
- Click Add Event Receiver.
- Enter the values as shown below to create an event receiver of the WSO2Event type for above PRODUCT_ALERTS_STREAM. For more information on creating event streams, see Event Streams .
- Click Add Event Receiver
Configuring a publisher
You can attach an event publisher such as email or JMS to the PRODUCT_ALERTS_STREAM, and get the events delivered to a preferred location. Follow the steps below to configure a publisher to publish output events from WSO2 DAS, and to send notifications as logs of the terminal using a logger publisher. For more information on configuring event publishers, see Creating Alerts.
- Log in to the DAS management console using the following URL, if you are not already logged in: https://<DAS_HOST>:<DAS_PORT>/carbon/
- Click Main , and then click Publishers.
- Enter the details as shown below to configure the publisher.
- Click Add Event Publisher.
Executing the sample
Follow the steps below to execute the sample.
Creating the Spark table which maps to the created event stream
Follow the steps below to create the corresponding virtual table named PRODUCT_ALERTS in the Apache Spark environment within WSO2 DAS to maps with the created event stream PRODUCT_ALERTS_STREAM.
- Log in to the DAS management console using the following URL, if you are not already logged in: https://<DAS_HOST>:<DAS_PORT>/carbon/
- Click Main, and then click Console.
Enter the following Spark SQL query in the Spark console, and press Enter key to execute it.
CREATE TEMPORARY TABLE PRODUCT_ALERTS USING org.wso2.carbon.analytics.spark.event.EventStreamProvider OPTIONS (receiverURL "tcp://<DAS_HOST>:<DAS_THRIFT_PORT>", username "<USERNAME>", password "<PASSWORD>", streamName "PRODUCT_ALERTS_STREAM", version "1.0.0", description "Events are published when product quantity goes beyond a certain level", nickName "product alerts", payload "product-name STRING,quantity INT" );
Please provide the appropriate values for DAS_HOST, DAS_THRIFT_PORT, USERNAME and PASSWORD. Default values are, localhost, 7611, admin and admin.
Publishing events to the created event stream
Follow the steps below to publish output events to the created event stream.
- Log in to the DAS management console using the following URL, if you are not already logged in: https://<DAS_HOST>:<DAS_PORT>/carbon/
- Click Main, and then click Console.
Enter the following Spark SQL query in the Spark console, and press Enter key to execute it.
INSERT OVERWRITE TABLE PRODUCT_ALERTS select * from PRODUCTS_MASTER where quantity<5;
When this query executes, output of the select query is inserted into the PRODUCT_ALERTS table. It reads all the products from the PRODUCTS_STREAM S park table which have its quantity less than 50. During the query execution, individual rows returned from the select query are published into the PRODUCT_ALERTS stream as events.
- You view the text events that are published to the CEP server in the logs of it in the CLI as shown below.