Detecting Anomalies
Introduction
In the previous two tutorials, you looked at how machine learning can be used in conjunction with the real-time processing capabilities offered by the Stream Processor.  Now, let's consider a common use-case that is often encountered, namely anomaly detection. In this tutorial, you will look at how the outlier function offered by the time series execution extension can be used to detect anomalies in a stream of events.
The Stream Processor offers various time series-based anomaly detection mechanisms, such as lengthTime
and outlier
. In addition, clustering algorithms such as K-Means
and ClusTree
can also be used for detecting outliers.
In this scenario, the Stream Processor receives a stream of events from the sugar syrup supplier. This stream contains information on the temperature, density and the viscosity of the syrup supplied. The factory foreman needs to monitor the supply of sugar syrup and ensure that the density and the viscosity of each shipment is within an acceptable range. What is required is to have a mechanism that evaluates the viscosity readings based on the supplied syrup shipment and produce an alert if the viscosity is not in the expected range for the given temperature and density. It is also required that the outlier calculation takes place for every last five shipments received so that at any given time, the detection is carried out based on the latest events.
Tutorial steps
Let's get started!
Let's start by defining a stream for the input received from the supplier. This needs to contain the temperature, density and viscosity of the new shipment.
define stream SugarSyrupDataStream (viscosity double, temperature double, density double);
Next, let's define an output stream to be used for alerting if an outlier is found.
define stream OutlierStream (viscosity double, temperature double, density double, outlier bool);
Now let's add a simple query to select all input and add it to theÂ
OutlierStream
stream as follows.from SugarSyrupDataStream select * insert into OutlierStream;
This query does not function at this stage because theÂ
OutlierStream
stream contains an additional attribute named outlier that is not included in the definition of theÂSugarSyrupDataStream
stream.To enable the outlier execution extension, let's add theÂ
timeseries
annotation. This annotation indicates that a time series extension is being invoked.from SugarSyrupDataStream#timeseries
select *
insert into OutlierStream;The function to be used for anomaly detection isÂ
outlier()
. Therefore, let's add it as follows.from SugarSyrupDataStream#timeseries:outlier()
select *
insert into OutlierStream;
Next, let's add parameters for the outlier function as follows.
from SugarSyrupDataStream#timeseries:outlier(5, viscosity, temperature, density)
select *
insert into OutlierStream;
The outlier extension allows a set of parameters that include the following that are used in the query above.
- The window size: This specifies the length of the sliding window to be considered when the outlier is calculated. In this scenario, you need to consider five shipments. Therefore, the number of events must be 5.
- The dependant variable:Â This is the variable of which the change needs to be evaluated based on the given independent variables and a linear function. The factory foreman is interested in detecting anomalies in viscosity based on the changes in temperature and density. Therefore, the viscosity is the dependent variable in this scenario.
- A set of independent variables: These dictate the nature of the graph, and form the baseline upon which the changes in the dependent variables are measured. In this scenario, the temperature and the density can be considered the independent variables.
The above parameters are relevant for this user scenario. For the complete list of parameters that can be used with the outlier function, see Siddhi Query Guide 4.0 - Time Series - outlier.
This completes the query. The complete Siddhi application looks as follows.
@App:name('SugarSyrupOutlierPredictionApp') @source(type='http', receiver.url='http://localhost:5007/SugarSyrupEP', @map(type = 'json')) define stream SugarSyrupDataStream (viscosity double, temperature double, density double); @sink(type='log', prefix='Outlier detected in sugar syrup supply:') define stream OutlierStream (viscosity double, temperature double, density double, outlier bool); from SugarSyrupDataStream#timeseries:outlier(5, viscosity, temperature, density) select * insert into OutlierStream;