Real-time Anomaly detection on Azure

Anomaly Detection is a very powerful pattern and mostly, 70% time used. Azure Stream Analytics is having built-in ML-based Anomaly detection. It is based on the Un-supervised learning model i.e. model does not come with any pre-training, it starts learning with no of events and starts scoring on the fly and will work for all flavor of data. It has not only temporary anomalies like spike and dips (which is generally easy to implement) but also can be used with persistence anomalies which slowly +/-ve trends. For example memory leaks in a VM are slowly changing anomalies. Example for both is below:

/*Spike and dip: Temporary anomalies in a time series event stream.  
Assumes a uniform input rate of one event per second in a 2-minute sliding window with a history of 120 events. The final SELECT statement extracts and outputs the score and anomaly status with a confidence level of 95%.  

WITH AnomalyDetectionStep AS  
        CAST(temperature AS floatAS temp,  
        AnomalyDetection_SpikeAndDip(CAST(temperature AS float), 95, 120, 'spikesanddips')  
            OVER(LIMIT DURATION(second, 120)) AS SpikeAndDipScores  
    FROM iotinput  
    CAST(GetRecordPropertyValue(SpikeAndDipScores, 'Score'AS floatAS  
    CAST(GetRecordPropertyValue(SpikeAndDipScores, 'IsAnomaly'AS bigintAS  
INTO powerbiout  
FROM AnomalyDetectionStep  

/*Change point: Persistent anomalies in a time series event stream  
Assumes a uniform input rate of one event per second in a 20-minute sliding window with a history size of 1200 events. The final SELECT statement extracts and outputs the score and anomaly status with a confidence level of 80%.  

WITH AnomalyDetectionStep AS  
        CAST(temperature AS floatAS temp,  
        AnomalyDetection_ChangePoint(CAST(temperature AS float), 80, 1200)   
        OVER(LIMIT DURATION(minute, 20)) AS ChangePointScores  
    FROM iotinput  
    CAST(GetRecordPropertyValue(ChangePointScores, 'Score'AS floatAS  
    CAST(GetRecordPropertyValue(ChangePointScores, 'IsAnomaly'AS bigintAS  
INTO powerbiout  
FROM AnomalyDetectionStep 

On Azure Databricks here is one good article on Microsoft document to implement real-time anomaly detection. Here is my version of the same code.


// Anomaly Detection Client  
import{BufferedReader, DataOutputStream, InputStreamReader}  
import java.sql.Timestamp  
import{Gson, GsonBuilder, JsonParser}  
case class Point(var timestamp: Timestamp, var value: Double)  
case class Series(var series: Array[Point], var maxAnomalyRatio: Double, var sensitivity: Int, var granularity: String)  
case class AnomalySingleResponse(var isAnomaly: Boolean, var isPositiveAnomaly: Boolean, var isNegativeAnomaly: Boolean, var period: Int, var expectedValue: Double, var upperMargin: Double, var lowerMargin: Double, var suggestedWindow: Int)  
case class AnomalyBatchResponse(var expectedValues: Array[Double], var upperMargins: Array[Double], var lowerMargins: Array[Double], var isAnomaly: Array[Boolean], var isPositiveAnomaly: Array[Boolean], var isNegativeAnomaly: Array[Boolean], var period: Int)  
object AnomalyDetector extends Serializable {  
  // Cognitive Services API connection settings  
  val subscriptionKey = "[Placeholder: Your Anomaly Detector resource access key]"  
  val endpoint = "[Placeholder: Your Anomaly Detector resource endpoint]"  
  val latestPointDetectionPath = "/anomalydetector/v1.0/timeseries/last/detect"  
  val batchDetectionPath = "/anomalydetector/v1.0/timeseries/entire/detect";  
  val latestPointDetectionUrl = new URL(endpoint + latestPointDetectionPath)  
  val batchDetectionUrl = new URL(endpoint + batchDetectionPath)  
  val gson: Gson = new GsonBuilder().setDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'").setPrettyPrinting().create()  
  def getConnection(path: URL): HttpsURLConnection = {  
    val connection = path.openConnection().asInstanceOf[HttpsURLConnection]  
    connection.setRequestProperty("Ocp-Apim-Subscription-Key", subscriptionKey)  
    return connection  
  // Handles the call to Cognitive Services API.  
  def processUsingApi(request: String, path: URL): String = {  
    val encoded_text = request.getBytes("UTF-8")  
    val connection = getConnection(path)  
    val wr = new DataOutputStream(connection.getOutputStream())  
    wr.write(encoded_text, 0, encoded_text.length)  
    val response = new StringBuilder()  
    val in = new BufferedReader(new InputStreamReader(connection.getInputStream()))  
    var line = in.readLine()  
    while (line != null) {  
      line = in.readLine()  
    return response.toString()  
  // Calls the Latest Point Detection API.  
  def detectLatestPoint(series: Series): Option[AnomalySingleResponse] = {  
    try {  
      println("Process Timestamp: " + series.series.apply(series.series.length-1).timestamp.toString + ", size: " + series.series.length)  
      val response = processUsingApi(gson.toJson(series), latestPointDetectionUrl)  
      // Deserializing the JSON response from the API into Scala types  
      val anomaly = gson.fromJson(response, classOf[AnomalySingleResponse])  
      return Some(anomaly)  
    } catch {  
      case e: Exception => {  
        return None  
  // Calls the Batch Detection API.  
  def detectBatch(series: Series): Option[AnomalyBatchResponse] = {  
    try {  
      val response = processUsingApi(gson.toJson(series), batchDetectionUrl)  
      // Deserializing the JSON response from the API into Scala types  
      val anomaly = gson.fromJson(response, classOf[AnomalyBatchResponse])  
      return Some(anomaly)  
    } catch {  
      case e: Exception => {  
        return None  

// User Defined Aggregation Function for Anomaly Detection  
import org.apache.spark.sql.Row  
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}  
import org.apache.spark.sql.types.{StructType, TimestampType, FloatType, MapType, BooleanType, DataType}  
import scala.collection.immutable.ListMap  
class AnomalyDetectorAggregationFunction extends UserDefinedAggregateFunction {  
  override def inputSchema: StructType = new StructType().add("timestamp", TimestampType).add("value", FloatType)  
  override def bufferSchema: StructType = new StructType().add("point", MapType(TimestampType, FloatType))  
  override def dataType: DataType = BooleanType  
  override def deterministic: Boolean = false  
  override def initialize(buffer: MutableAggregationBuffer): Unit = {  
    buffer(0) = Map()  
  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {  
    buffer(0) = buffer.getAs[Map[java.sql.Timestamp, Float]](0) + (input.getTimestamp(0) -> input.getFloat(1))  
  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {  
    buffer1(0) = buffer1.getAs[Map[java.sql.Timestamp, Float]](0) ++ buffer2.getAs[Map[java.sql.Timestamp, Float]](0)  
  override def evaluate(buffer: Row): Any = {  
    val points = buffer.getAs[Map[java.sql.Timestamp, Float]](0)  
    if (points.size > 12) {  
      val sorted_points = ListMap(points.toSeq.sortBy(_._1.getTime):_*)  
      var detect_points: List[Point] = List()  
      sorted_points.keys.foreach {  
        key => detect_points = detect_points :+ new Point(key, sorted_points(key))  
      // 0.25 is maxAnomalyRatio. It represents 25%, max anomaly ratio in a time series.  
      // 95 is the sensitivity of the algorithms.  
      // Check Anomaly detector API reference (  
      val series: Series = new Series(detect_points.toArray, 0.2595"hourly")  
      val response: Option[AnomalySingleResponse] = AnomalyDetector.detectLatestPoint(series)  
      if (!response.isEmpty) {  
        return response.get.isAnomaly  
    return None  
// Getting Delta Stram  
val aggrData = spark.sql("""  
select window(current_timestamp, 5).end as groupTime, avg(temperature) averageTemp  
from stream_view_delta  
group by window(current_timestamp, 5).end  
order by groupTime""")  

// Final Anomaly Detection using the UDF created above
import java.time.Instant  
import java.time.format.DateTimeFormatter  
import java.time.ZoneOffset  
import java.time.temporal.ChronoUnit  
val detectData ="delta").table("stream_view_for_anomaly")  
// You could use Databricks to schedule an hourly job and always monitor the latest data point  
// Or you could specify a const value here for testing purpose  
// For example, val endTime = Instant.parse("2019-04-16T00:00:00Z")  
val endTime =  
// This is when your input of anomaly detection starts. It is hourly time series in this tutorial, so 72 means 72 hours ago from endTime.  
val batchSize = 72  
val startTime = endTime.minus(batchSize, ChronoUnit.HOURS)  
val DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").withZone(ZoneOffset.UTC);  
val series = detectData.filter($"groupTime" <= DATE_TIME_FORMATTER.format(endTime))  
  .filter($"groupTime" > DATE_TIME_FORMATTER.format(startTime))  
// Register the function to access it  
spark.udf.register("anomalydetect", new AnomalyDetectorAggregationFunction)  
val adResult = spark.sql("SELECT '" + endTime.toString + "' as datetime, anomalydetect(groupTime, averageTemp) as anomaly FROM series")  

