Learn Apache Spark and Scala with examples

First if you not have installed Apache Spark yet, you need to follow here to install spark on windows 10.
Now Download Bible.txt from here. We will be using bible for analysis here. I have placed this at C:\install

//Example 1: Simple word count for bible

val testFile = sc.textFile("C:/install/bible.txt")
val counts = testFile.flatMap(_.split(" ")).map( x => (x , 1)). reduceByKey(_+_).sortByKey()
counts.count()
counts.collect()



/* Illustration for Apache Spark word count code in IDE like Eclipse, you can follow sample below:
*/
package com.devinline.spark
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
object WordCount {
def main(args: Array[String]) = {

//Start the Spark context
val conf = new SparkConf().setAppName("WordCount").setMaster("local")
val sc = new SparkContext(conf)

//Read some example file to a test RDD
val test = sc.textFile("file:///C:/Users/HASNAIN/Desktop/txt/dstxt.txt")

test.flatMap { line => //for each line
line.split(" ") //split the line in word by word.
}
.map { word => //for each word
(word, 1) //Return a key/value tuple, with the word as key and 1 as value
}
.reduceByKey(_ + _) //Sum all of the value with same key
.saveAsTextFile("file:///C:/Users/HASNAIN/Desktop/txt/output1.txt") //Save to a text file
//Stop the Spark context
sc.stop
}

}

//Example 2: Find the top 10 word occurances in bible after removing sample set of stop words and header

val testWords = Set("the", "and","for","that","are","with","this","can","them")
val testFile = sc.textFile("C:/install/bible.txt").distinct().cache()
val header = testFile.first()
testFile.filter(x => x != header).
flatMap(_.trim.split("""[\s\W]+""")).
filter(_.length > 2).
map(_.toLowerCase).
filter(!testWords.contains(_)).
map( x => (x , 1)).
reduceByKey(_+_).
sortBy(_._2,ascending=false).
take(10).
foreach{case (x,y) => println(f"$x%20s:$y%04d")}


//Example 3: Find the word count contain word "god" in it.

val rdd = sc.textFile("C:/install/bible.txt").
flatMap(_.trim.split("""[\s\W]+""")).
filter(_.length > 2).
map(_.toLowerCase).
filter(_.contains("god")).map( x => (x , 1)).
reduceByKey(_+_).
sortBy(_._2,ascending=false).
take(50).
foreach{case (x,y) => println(f"$x%20s:$y%04d")}


//Example 4: Find word count of good and evil

val rdd = sc.textFile("C:/install/bible.txt").
flatMap(_.trim.split("""[\s\W]+""")).
filter(_.length > 2).
map(_.toLowerCase).
filter(x => x.contains("good") || x.contains("evil")).map( x => (x , 1)).
reduceByKey(_+_).
sortBy(_._2,ascending=false).
take(50).
foreach{case (x,y) => println(f"$x%20s:$y%04d")}


//Example 5: Example for combineByKeyby calculating avrage in a list

val rdd = sc.parallelize(List(("A", 3), ("A", 9), ("A", 12), ("A", 0), ("A", 5),("B", 4), ("B", 10), ("B", 11), ("B", 20), ("B", 25),("C", 32), ("C", 91),("C", 122), ("C", 3), ("C", 55)), 2)
val rdd2= rdd.combineByKey(
(x:Int) => (x, 1),
(acc:(Int, Int), x) => (acc._1 + x, acc._2 + 1),
(acc1:(Int, Int), acc2:(Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)).
map{case(k,v) => (k, v._1/v._2)}.collect()


SCALA Examples:

Binary Search:
Search a sorted array by repeatedly dividing the search interval in half. Begin with an interval covering the whole array. If the value of the search key is less than the item in the middle of the interval, narrow the interval to the lower half. Otherwise narrow it to the upper half. Repeatedly check until the value is found or the interval is empty.

scala> def binerySerch(x:Array[Int],y:Int):Int = {
     | var start = 0
     | var end = x.length
     | var mid = (start + end)/2
     | while (end > start && x(mid) != y) {
     | if (y < x(mid)) end = mid else start = mid +1
     | mid = (start + end )/2
     | }
     | if (end > start ) mid else -1}
binerySerch: (x: Array[Int], y: Int)Int

scala> binerySerch(Array(1,2,3,4,5,6,7,8,9),8)
res14: Int = 7

scala> binerySerch(Array(1,2,3,4,5,6,7,8,9),18)
res15: Int = -1

Bubble Sort:
The bubble sort makes multiple passes through a list. It compares adjacent items and exchanges those that are out of order. Each pass through the list places the next largest value in its proper place. In essence, each item “bubbles” up to the location where it belongs.

scala> def bubblesort(a:Array[Int]):Array[Int] = {
     | for (i <-0 until a.length-1; j <-0 until a.length-1-i) {
     | if (a(j) > a(j+1)) {
     | val temp = a(j+1)
     | a(j+1) = a(j)
     | a(j) = temp
     | }
     | }
     |   a
     | }
bubblesort: (a: Array[Int])Array[Int]

scala> bubblesort(Array(13,56,3,4,0,4,5,5,6,7,8,4,0,4,43,2))
res16: Array[Int] = Array(0, 0, 2, 3, 4, 4, 4, 4, 5, 5, 6, 7, 8, 13, 43, 56)


Selection Sort:
In a selection sort the data is searched for the minimum (or maximum) element, and this is swapped with the element in its place at the end. As with a bubble sort, the end of the range to be sorted is adjusted after each pass as a sorted sublist is built up at the end. The construction of the sorted range is identical to that in a bubble sort, but it is performed much more efficiently because there are fewer swaps.

scala> def minsort(a:Array[Int]):Array[Int] = {
     | for (i <-0 until a.length-1) {
     | var min = i
     | for (j <- i+1 until a.length){
     | if (a(j) < a(min)) min =j
     | }
     | if (min !=i) {
     | val temp = a(i)
     | a(i) = a(min)
     | a(min) = temp
     | }
     | }
     | a
     | }
minsort: (a: Array[Int])Array[Int]

scala> minsort(Array(13,56,3,4,0,4,5,5,6,7,8,4,0,4,43,2))
res17: Array[Int] = Array(0, 0, 2, 3, 4, 4, 4, 4, 5, 5, 6, 7, 8, 13, 43, 56)


BASIS FOR COMPARISON
BUBBLE SORT
SELECTION SORT
Basic
Adjacent element is compared and swapped
Largest element is selected and swapped with the last element (in case of ascending order).
Best case time complexity
O(n)
O(n2)
Efficiency
Inefficient
Improved efficiency as compared to bubble sort
Stable
Yes
No
Method
Exchanging
Selection
Speed
Slow
Fast as compared to bubble sort

Post a Comment

Thanks for your comment !
I will review your this and will respond you as soon as possible.