Anomaly Detection is a very powerful
pattern and mostly, 70% time used. Azure Stream Analytics is having built-in ML-based Anomaly detection. It is based on the Un-supervised learning model i.e. model
does not come with any pre-training, it starts learning with no of events and
starts scoring on the fly and will work for all flavor of data. It has not only
temporary anomalies like spike and dips (which is generally easy to implement) but
also can be used with persistence anomalies which slowly +/-ve trends. For
example memory leaks in a VM are slowly changing anomalies. Example for both is
below:
/*Spike and dip: Temporary anomalies in a time series event stream.
Assumes a uniform input rate of one event per second in a 2-minute sliding window with a history of 120 events. The final SELECT statement extracts and outputs the score and anomaly status with a confidence level of 95%.
*/
WITH AnomalyDetectionStep AS
(
SELECT
EVENTENQUEUEDUTCTIME AS time,
CAST(temperature AS float) AS temp,
AnomalyDetection_SpikeAndDip(CAST(temperature AS float), 95, 120, 'spikesanddips')
OVER(LIMIT DURATION(second, 120)) AS SpikeAndDipScores
FROM iotinput
)
SELECT
time,
temp,
CAST(GetRecordPropertyValue(SpikeAndDipScores, 'Score') AS float) AS
SpikeAndDipScore,
CAST(GetRecordPropertyValue(SpikeAndDipScores, 'IsAnomaly') AS bigint) AS
IsSpikeAndDipAnomaly
INTO powerbiout
FROM AnomalyDetectionStep
/*Change point: Persistent anomalies in a time series event stream
Assumes a uniform input rate of one event per second in a 20-minute sliding window with a history size of 1200 events. The final SELECT statement extracts and outputs the score and anomaly status with a confidence level of 80%.
*/
WITH AnomalyDetectionStep AS
(
SELECT
EVENTENQUEUEDUTCTIME AS time,
CAST(temperature AS float) AS temp,
AnomalyDetection_ChangePoint(CAST(temperature AS float), 80, 1200)
OVER(LIMIT DURATION(minute, 20)) AS ChangePointScores
FROM iotinput
)
SELECT
time,
temp,
CAST(GetRecordPropertyValue(ChangePointScores, 'Score') AS float) AS
ChangePointScore,
CAST(GetRecordPropertyValue(ChangePointScores, 'IsAnomaly') AS bigint) AS
IsChangePointAnomaly
INTO powerbiout
FROM AnomalyDetectionStep
On Azure Databricks here is one good
article on Microsoft document to implement real-time anomaly detection. Here is
my version of the same code.
//
// Anomaly Detection Client
//
import java.io.{BufferedReader, DataOutputStream, InputStreamReader}
import java.net.URL
import java.sql.Timestamp
import com.google.gson.{Gson, GsonBuilder, JsonParser}
import javax.net.ssl.HttpsURLConnection
case class Point(var timestamp: Timestamp, var value: Double)
case class Series(var series: Array[Point], var maxAnomalyRatio: Double, var sensitivity: Int, var granularity: String)
case class AnomalySingleResponse(var isAnomaly: Boolean, var isPositiveAnomaly: Boolean, var isNegativeAnomaly: Boolean, var period: Int, var expectedValue: Double, var upperMargin: Double, var lowerMargin: Double, var suggestedWindow: Int)
case class AnomalyBatchResponse(var expectedValues: Array[Double], var upperMargins: Array[Double], var lowerMargins: Array[Double], var isAnomaly: Array[Boolean], var isPositiveAnomaly: Array[Boolean], var isNegativeAnomaly: Array[Boolean], var period: Int)
object AnomalyDetector extends Serializable {
// Cognitive Services API connection settings
val subscriptionKey = "[Placeholder: Your Anomaly Detector resource access key]"
val endpoint = "[Placeholder: Your Anomaly Detector resource endpoint]"
val latestPointDetectionPath = "/anomalydetector/v1.0/timeseries/last/detect"
val batchDetectionPath = "/anomalydetector/v1.0/timeseries/entire/detect";
val latestPointDetectionUrl = new URL(endpoint + latestPointDetectionPath)
val batchDetectionUrl = new URL(endpoint + batchDetectionPath)
val gson: Gson = new GsonBuilder().setDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'").setPrettyPrinting().create()
def getConnection(path: URL): HttpsURLConnection = {
val connection = path.openConnection().asInstanceOf[HttpsURLConnection]
connection.setRequestMethod("POST")
connection.setRequestProperty("Content-Type", "text/json")
connection.setRequestProperty("Ocp-Apim-Subscription-Key", subscriptionKey)
connection.setDoOutput(true)
return connection
}
// Handles the call to Cognitive Services API.
def processUsingApi(request: String, path: URL): String = {
println(request)
val encoded_text = request.getBytes("UTF-8")
val connection = getConnection(path)
val wr = new DataOutputStream(connection.getOutputStream())
wr.write(encoded_text, 0, encoded_text.length)
wr.flush()
wr.close()
val response = new StringBuilder()
val in = new BufferedReader(new InputStreamReader(connection.getInputStream()))
var line = in.readLine()
while (line != null) {
response.append(line)
line = in.readLine()
}
in.close()
return response.toString()
}
// Calls the Latest Point Detection API.
def detectLatestPoint(series: Series): Option[AnomalySingleResponse] = {
try {
println("Process Timestamp: " + series.series.apply(series.series.length-1).timestamp.toString + ", size: " + series.series.length)
val response = processUsingApi(gson.toJson(series), latestPointDetectionUrl)
println(response)
// Deserializing the JSON response from the API into Scala types
val anomaly = gson.fromJson(response, classOf[AnomalySingleResponse])
Thread.sleep(5000)
return Some(anomaly)
} catch {
case e: Exception => {
println(e)
e.printStackTrace()
return None
}
}
}
// Calls the Batch Detection API.
def detectBatch(series: Series): Option[AnomalyBatchResponse] = {
try {
val response = processUsingApi(gson.toJson(series), batchDetectionUrl)
println(response)
// Deserializing the JSON response from the API into Scala types
val anomaly = gson.fromJson(response, classOf[AnomalyBatchResponse])
Thread.sleep(5000)
return Some(anomaly)
} catch {
case e: Exception => {
println(e)
return None
}
}
}
}
//
// User Defined Aggregation Function for Anomaly Detection
//
import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types.{StructType, TimestampType, FloatType, MapType, BooleanType, DataType}
import scala.collection.immutable.ListMap
class AnomalyDetectorAggregationFunction extends UserDefinedAggregateFunction {
override def inputSchema: StructType = new StructType().add("timestamp", TimestampType).add("value", FloatType)
override def bufferSchema: StructType = new StructType().add("point", MapType(TimestampType, FloatType))
override def dataType: DataType = BooleanType
override def deterministic: Boolean = false
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = Map()
}
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
buffer(0) = buffer.getAs[Map[java.sql.Timestamp, Float]](0) + (input.getTimestamp(0) -> input.getFloat(1))
}
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getAs[Map[java.sql.Timestamp, Float]](0) ++ buffer2.getAs[Map[java.sql.Timestamp, Float]](0)
}
override def evaluate(buffer: Row): Any = {
val points = buffer.getAs[Map[java.sql.Timestamp, Float]](0)
if (points.size > 12) {
val sorted_points = ListMap(points.toSeq.sortBy(_._1.getTime):_*)
var detect_points: List[Point] = List()
sorted_points.keys.foreach {
key => detect_points = detect_points :+ new Point(key, sorted_points(key))
}
// 0.25 is maxAnomalyRatio. It represents 25%, max anomaly ratio in a time series.
// 95 is the sensitivity of the algorithms.
// Check Anomaly detector API reference (https://aka.ms/anomaly-detector-rest-api-ref)
val series: Series = new Series(detect_points.toArray, 0.25, 95, "hourly")
val response: Option[AnomalySingleResponse] = AnomalyDetector.detectLatestPoint(series)
if (!response.isEmpty) {
return response.get.isAnomaly
}
}
return None
}
}
//
// Getting Delta Stram
//
val aggrData = spark.sql("""
select window(current_timestamp, 5).end as groupTime, avg(temperature) averageTemp
from stream_view_delta
group by window(current_timestamp, 5).end
order by groupTime""")
aggrData.createOrReplaceTempView("stream_view_for_anomaly")
display(aggrData)
//
// Final Anomaly Detection using the UDF created
above
//
import java.time.Instant
import java.time.format.DateTimeFormatter
import java.time.ZoneOffset
import java.time.temporal.ChronoUnit
val detectData = spark.read.format("delta").table("stream_view_for_anomaly")
// You could use Databricks to schedule an hourly job and always monitor the latest data point
// Or you could specify a const value here for testing purpose
// For example, val endTime = Instant.parse("2019-04-16T00:00:00Z")
val endTime = Instant.now()
// This is when your input of anomaly detection starts. It is hourly time series in this tutorial, so 72 means 72 hours ago from endTime.
val batchSize = 72
val startTime = endTime.minus(batchSize, ChronoUnit.HOURS)
val DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").withZone(ZoneOffset.UTC);
val series = detectData.filter($"groupTime" <= DATE_TIME_FORMATTER.format(endTime))
.filter($"groupTime" > DATE_TIME_FORMATTER.format(startTime))
.sort($"groupTime")
series.createOrReplaceTempView("series")
//display(series)
// Register the function to access it
spark.udf.register("anomalydetect", new AnomalyDetectorAggregationFunction)
val adResult = spark.sql("SELECT '" + endTime.toString + "' as datetime, anomalydetect(groupTime, averageTemp) as anomaly FROM series")
display(adResult)
Post a Comment
Post a Comment
Thanks for your comment !
I will review your this and will respond you as soon as possible.