Sending Notifications Through Published Events Using Spark
Introduction
This sample demonstrates how you can send notifications through events published from WSO2 DAS using Apache Spark. The notifications are sent to alert about records of an existing table in the Spark environment satisfying a defined condition(s). This sample involves creating a table with a few product names and quantities in the DAL, and sending notifications when the quantity of a product falls below a defined value.
Prerequisites
Set up the general prerequisites required for WSO2 DAS.
Building the sample
Follow the steps below to build the sample.
Creating the receiving event stream and the table in DAL
Follow the steps below to create a table named PRODUCTS together with the receiving event stream in the Data Access Layer (DAL).
- Log in to the DAS management console using the following URL: https://<DAS_HOST>:<DAS_PORT>/carbon/
- Click Main, and then click Streams.
- Click Add Event Stream.
- Enter the values as shown below to create an event stream named PRODUCTS_STREAM with two attributes as product name and quantity. For more information on creating event streams, see Understanding Event Streams and Event Tables.
- Click Next (Persist Event).
- Enter the values as shown below in the next screen to persist the created event stream. For more information on creating event streams, see Persisting Event Streams.
- Click Save Event Stream.
Sending events to the receiving event stream
Follow the steps below to simulate the sending of events to the created receiving event stream.
- Log in to the DAS management console using the following URL, if you are not already logged in: https://<DAS_HOST>:<DAS_PORT>/carbon/
- Click Tools, and then click Event Simulator.
- Upload the events to be sent to it in a CSV file (e.g. footwear.csv), and click Configure as shown below.
- Enter the details as shown below, and click Configure.
- Click Play in the next screen as shown below.
- Click Main, and then click Data Explorer. Select the name of the table to view it as shown below.
Creating the corresponding table in Apache Spark
Follow the steps below to create a virtual table in the Apache Spark environment within WSO2 DAS to map the PRODUCTS_STREAM
table in the DAL.
- Log in to the DAS management console using the following URL, if you are not already logged in:
https://<DAS_HOST>:<DAS_PORT>/carbon/
- Click Main, and then click Console.
Enter the following Spark SQL query in the Spark console, and press Enter key to execute it.
CREATE TEMPORARY TABLE PRODUCTS_MASTER USING CarbonAnalytics OPTIONS (tableName "PRODUCTS_STREAM", schema "product-name STRING,quantity INT");
Creating the event stream to publish event from Spark
For publishing events from Spark, you have to define an event stream with given stream attributes that will be published from Spark.
- Log in to the DAS management console using the following URL:
https://<DAS_HOST>:<DAS_PORT>/carbon/
- Click Main, and then click Streams.
- Click Add Event Stream.
- Enter the values as shown below to create an event stream named
PRODUCT_ALERTS_STREAM
with two attributes as product name and quantity. For more information on creating event streams, see Event Streams. - Click Add Event Stream.
Creating the event receiver for event stream
Once you define the event stream, you need to create an event receiver of the WSO2Event type to receive events from Spark. For more information, see Configuring Event Receivers.
- Log in to the DAS management console using the following URL:
https://<DAS_HOST>:<DAS_PORT>/carbon/
- Click Main, and then click Receivers.
- Click Add Event Receiver.
- Enter the values as shown below to create an event receiver of the WSO2Event type for above
PRODUCT_ALERTS_STREAM
. For more information on creating event streams, see Event Streams. - Click Add Event Receiver
Configuring a publisher
You can attach an event publisher such as email or JMS to the PRODUCT_ALERTS_STREAM, and get the events delivered to a preferred location. Follow the steps below to configure a publisher to publish output events from WSO2 DAS, and to send notifications as logs of the terminal using a logger publisher. For more information on configuring event publishers, see Creating Alerts.
- Log in to the DAS management console using the following URL, if you are not already logged in:
https://<DAS_HOST>:<DAS_PORT>/carbon/
- Click Main , and then click Publishers.
- Enter the details as shown below to configure the publisher.
- Click Add Event Publisher.
Executing the sample
Follow the steps below to execute the sample.
Creating the Spark table which maps to the created event stream
Follow the steps below to create the corresponding virtual table named PRODUCT_ALERTS
in the Apache Spark environment within WSO2 DAS to maps with the created event stream PRODUCT_ALERTS_STREAM
.
- Log in to the DAS management console using the following URL, if you are not already logged in:
https://<DAS_HOST>:<DAS_PORT>/carbon/
- Click Main, and then click Console.
Enter the following Spark SQL query in the Spark console, and press Enter key to execute it.
CREATE TEMPORARY TABLE PRODUCT_ALERTS USING org.wso2.carbon.analytics.spark.event.EventStreamProvider OPTIONS (streamName "PRODUCT_ALERTS_STREAM", version "1.0.0", payload "product-name STRING, quantity INT" );
Note
The EventStreamProvider works in a way that, it writes event data to a central analytics table, and the event data is picked up by a scheduled task in the receiver nodes, and these events are dispatched to the streams from there. The scheduled task polls the source analytics table in 10 second intervals, to check if there are any event data pending. If required, this scheduled task can be disabled explicitly in a node, by setting the Java system property "disableSparkEventingTask" to "true" (e.g. ./wso2server.sh|bat -DdisableSparkEventingTask=true).
Publishing events to the created event stream
Follow the steps below to publish output events to the created event stream.
- Log in to the DAS management console using the following URL, if you are not already logged in:
https://<DAS_HOST>:<DAS_PORT>/carbon/
- Click Main, and then click Console.
Enter the following Spark SQL query in the Spark console, and press Enter key to execute it.
INSERT OVERWRITE TABLE PRODUCT_ALERTS select * from PRODUCTS_MASTER where quantity<5;
When this query executes, output of the select query is inserted into the PRODUCT_ALERTS
table. It reads all the products from the PRODUCTS_STREAM
Spark table which have its quantity less than 50. During the query execution, individual rows returned from the select query are published into the PRODUCT_ALERTS
stream as events.
- You view the text events that are published to the CEP server in the logs of it in the CLI as shown below.