Real-Time Data Stream Processing In Azure Part: 2

This article is extend of my previous post “Real-Time Data Stream Processing in Azure Part: 1” on real time analytics. Before I start the core for which you started reading this blog, lets quickly review few basic concepts for Stream Analytics.
Azure Stream Analytics provides:
        SQL like Transformations and computations over streams of events
        Subset of T-SQL syntax
        Built-in functions
        Support processing events in CSV, JSON, Avro, parquet etc data formats
        Support complex types such as nested objects (records) and arrays
        Can join multiple stream, self-join
        Azure ML Integration
        Out of the box Azure Integrations
        Custom function support


In above, Windowing extension is important and it is probably the biggest difference between analyzing static data vs streaming data.  Here you need to breakup your stream of data in defined window of time and then you will run some quarries, analysis function, and aggregation over that defined window of time. The 3 important windowing extension is as below:

Tumbling Window (Fixed)



Hopping Window:
Note: tumbling window is a hopping window whose ‘hop’ is equal to its ‘size’



Sliding Window
When an event entered or exits the window




Also go through this document, I found very useful solutions patterns in this.

Real Time Events Source:
Now in order to generate real time data IoT data you can go for purchasing “Azure MXChip”. This IoT DevKit board with a Micro-USB cable cost USD $39


You can follow this Microsoft article which in detail talk about the setup of this device. To get more detail of the device follow here.

Raspberry pi web Simulator:

Another quick way is to use a simulator raspberry pi. The Raspberry Pi is a low cost, credit-card sized computer that plugs into a computer monitor or TV, and uses a standard keyboard and mouse.

Connect Raspberry Pi online simulator to Azure IoT Hub

This have 3 window:

Assembly area - The default circuit is that a Pi connects with a BME280 sensor and an LED. The area is locked in preview version so currently you cannot do customization.

Coding area - An online code editor for you to code with Raspberry Pi. The default sample application helps to collect sensor data from BME280 sensor and sends to your Azure IoT Hub. The application is fully compatible with real Pi devices.

Integrated console window - It shows the output of your code. At the top of this window, there are three buttons.

Run - Run the application in the coding area.

Reset - Reset the coding area to the default sample application.

Fold/Expand - On the right side there is a button for you to fold/expand the console window.

The highlighted 15 line is where you need to pass primary connection string of the IoT Device created in IoT Hub and just run, it will start sending the messages to IoT Hub.

Sample message send:

Stop  Reset

Click `Run` button to run the sample code(When sample is running, code is read-only).

Click `Stop` button to stop the sample code running.

Click `Reset` to reset the code.We keep your changes to the editor even you refresh the page.

Sending message: {"messageId":1,"deviceId":"Raspberry Pi Web Client","temperature":30.887271587926808,"humidity":77.08763030185419}

Message sent to Azure IoT Hub

Sending message: {"messageId":2,"deviceId":"Raspberry Pi Web Client","temperature":29.596987342882528,"humidity":68.70138365486578}

Message sent to Azure IoT Hub

Sending message: {"messageId":3,"deviceId":"Raspberry Pi Web Client","temperature":24.48454310680801,"humidity":66.51982520835813}

Message sent to Azure IoT Hub

Databrciks Sample Streaming Data:

You can also use different sample streaming data on Azure Databrciks directly, can be found at path below

/databricks-datasets/structured-streaming/events/

Use like below:

%fs ls /databricks-datasets/structured-streaming/events/

%fs head /databricks-datasets/structured-streaming/events/file-0.json

Now when you decided the streaming IoT source and received the events at IoT Hub. Next step I am using azure stream analytics to write events into data lake gen2 in csv format as well as parquet format. Also in same stream analytics created one more query to output the avg temperature on every 2 second to power BI dataset.

SELECT *  INTO csvout  FROM iotinput  

SELECT *  INTO parquetout  FROM iotinput  

SELECT System.Timestamp AS WindowEnd,  AVG(temperature) as Temperature  

INTO powerbiout  FROM iotinput  

GROUP BY  TumblingWindow(second, 2)  



In Power Bi you can collect these events in dataset and create real time charts like below:

For the Parquet events received, In Azure Databricks created view like below:

import org.apache.spark.sql.functions._  

val Parquet_Schema=spark.read.parquet("/mnt/iotdata/parquet15/2020/06/15/")   //TO retrieve schema  

val streamingInputDF = spark  

            .readStream  

            .schema(Parquet_Schema.schema)              // inferSchema didnet work, so use Parquet_Schema  

            .option("maxFilesPerTrigger"1)            // Treat a sequence of files as a stream by picking one file at a time  

            .parquet("/mnt/iotdata/parquet15/2020/06/24/")  //Read parquet streaming data  

streamingInputDF.createOrReplaceTempView("stream_view")     //create temp view on stram data  

 

Similar to Power bi what we have done, average temperature over a window of 1 sec.

%sql  

--Tumbling window to calculate average temprature  

SELECT   

date_format(window(from_utc_timestamp(current_timestamp'GMT+8'),"1 second").end,"HH:mm:ss"time,  

AVG(temperature) temperature    

FROM stream_view  

GROUP BY date_format(window(from_utc_timestamp(current_timestamp'GMT+8'),"1 second").end,"HH:mm:ss")  

ORDER BY 1 DESC  

To convert this data to delta format we need to use Checkpointing like below:

// Starting Parquet to Delta Conversion with Checkpoint  

/* outputMode are below: 

Complete Mode: The entire updated result table is written to external storage.

Append Mode: This is applicable only for the queries where existing rows in the Result Table are not expected to change. 

Update Mode: Only the rows that were updated in the result table since the last trigger are written to external storage. This is different from Complete Mode in that Update Mode outputs only the rows that have changed since the last trigger. If the query doesn’t contain aggregations, it is equivalent to Append mode. 

*/  

streamingInputDF.writeStream  

  .format("delta")  

  .outputMode("append")  

  .option("checkpointLocation""/mnt/iotdata/iotdata_delta_chk24/_checkpoints/etl-from-parquet")  

  .start("/mnt/iotdata/iotdata_delta_chk24/")  

Now on delta Checkpointing data we can use to create another delta view and similar windowing operation we can do on this also.

//Reading Delta streaming  

import org.apache.spark.sql.functions._  

val deltaDF=spark.readStream  

          .format("delta")  

          .option("ignoreDeletes""true")  

          .load("/mnt/iotdata/iotdata_delta_chk24/")  

deltaDF.createOrReplaceTempView("stream_view_delta")  

 

Post a Comment

Thanks for your comment !
I will review your this and will respond you as soon as possible.