This have 3 window:
Assembly
area - The default circuit is that a Pi connects with a BME280 sensor and
an LED. The area is locked in preview version so currently you cannot do
customization.
Coding
area - An online code editor for you to code with Raspberry Pi. The default
sample application helps to collect sensor data from BME280 sensor and sends to
your Azure IoT Hub. The application is fully compatible with real Pi devices.
Integrated
console window - It shows the output of your code. At the top of this
window, there are three buttons.
Run - Run the application in
the coding area.
Reset - Reset the coding
area to the default sample application.
Fold/Expand - On the right
side there is a button for you to fold/expand the console window.
The highlighted 15 line is where you
need to pass primary connection string of the IoT Device created in IoT Hub and
just run, it will start sending the messages to IoT Hub.
Sample message send:
Stop Reset
Click `Run` button to run the
sample code(When sample is running, code is read-only).
Click `Stop` button to stop the
sample code running.
Click `Reset` to reset the
code.We keep your changes to the editor even you refresh the page.
> Sending message: {"messageId":1,"deviceId":"Raspberry Pi Web Client","temperature":30.887271587926808,"humidity":77.08763030185419}
> Message sent to Azure IoT Hub
> Sending message: {"messageId":2,"deviceId":"Raspberry Pi Web Client","temperature":29.596987342882528,"humidity":68.70138365486578}
> Message sent to Azure IoT Hub
> Sending message: {"messageId":3,"deviceId":"Raspberry Pi Web Client","temperature":24.48454310680801,"humidity":66.51982520835813}
> Message sent to Azure IoT Hub
Databrciks Sample Streaming Data:
You can also use different sample streaming
data on Azure Databrciks directly, can be found at path below
/databricks-datasets/structured-streaming/events/
Use like below:
%fs ls
/databricks-datasets/structured-streaming/events/
%fs head
/databricks-datasets/structured-streaming/events/file-0.json
Now when you decided the streaming IoT source and received the events at IoT Hub. Next step I am using azure stream analytics to write events into data lake gen2 in csv format as well as parquet format. Also in same stream analytics created one more query to output the avg temperature on every 2 second to power BI dataset.
SELECT * INTO csvout FROM iotinput
SELECT * INTO parquetout FROM iotinput
SELECT System.Timestamp AS WindowEnd, AVG(temperature) as Temperature
INTO powerbiout FROM iotinput
GROUP BY TumblingWindow(second, 2)
For the Parquet events received, In
Azure Databricks created view like below:
import org.apache.spark.sql.functions._
val Parquet_Schema=spark.read.parquet("/mnt/iotdata/parquet15/2020/06/15/") //TO retrieve schema
val streamingInputDF = spark
.readStream
.schema(Parquet_Schema.schema) // inferSchema didnet work, so use Parquet_Schema
.option("maxFilesPerTrigger", 1) // Treat a sequence of files as a stream by picking one file at a time
.parquet("/mnt/iotdata/parquet15/2020/06/24/") //Read parquet streaming data
streamingInputDF.createOrReplaceTempView("stream_view") //create temp view on stram data
Similar to Power bi what we have done, average
temperature over a window of 1 sec.
%sql
--Tumbling window to calculate average temprature
SELECT
date_format(window(from_utc_timestamp(current_timestamp, 'GMT+8'),"1 second").end,"HH:mm:ss") time,
AVG(temperature) temperature
FROM stream_view
GROUP BY date_format(window(from_utc_timestamp(current_timestamp, 'GMT+8'),"1 second").end,"HH:mm:ss")
ORDER BY 1 DESC
To convert this data to delta format we need to use Checkpointing like below:
// Starting Parquet to Delta Conversion with Checkpoint
/* outputMode are below:
Complete Mode: The entire updated result table is written to external storage.
Append Mode: This is applicable only for the queries where existing rows in the Result Table are not expected to change.
Update Mode: Only the rows that were updated in the result table since the last trigger are written to external storage. This is different from Complete Mode in that Update Mode outputs only the rows that have changed since the last trigger. If the query doesn’t contain aggregations, it is equivalent to Append mode.
*/
streamingInputDF.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/mnt/iotdata/iotdata_delta_chk24/_checkpoints/etl-from-parquet")
.start("/mnt/iotdata/iotdata_delta_chk24/")
Now on delta Checkpointing data we can use to create another delta view and similar windowing operation we can do on this also.
//Reading Delta streaming
import org.apache.spark.sql.functions._
val deltaDF=spark.readStream
.format("delta")
.option("ignoreDeletes", "true")
.load("/mnt/iotdata/iotdata_delta_chk24/")
deltaDF.createOrReplaceTempView("stream_view_delta")
Post a Comment
Post a Comment
Thanks for your comment !
I will review your this and will respond you as soon as possible.