Open main menu

CDOT Wiki β

The Real A Team

Using Apache's Spark

This assignment is dedicated to learning how to use Apache's Spark. I chose to use the programming language Scala because it is the native language Spark was created for.

What Is Scala?

Info on scala can be found at: http://www.scala-lang.org/ but in short, scala is a programming language that was created to build scalable applications. It is based off of Java, and can be seamlessly integrated onto a Java virtual machine (JVM).

IDE Used

For this asignment the Scala IDE was used. It can be found here: http://scala-ide.org/ The IDE is a version of eclipse with Scala built in.

A Team Members

  1. Adrian Sauvageot, All
  2. ...

Email All

PreAssignment

This assignment will be going over how to do a simple word count in Scala using Spark.

Introduction To Spark

To introduce myself to spark I watched a couple of youtube videos that were filmed at a Spark conference. The videos can be found here: https://www.youtube.com/watch?v=nxCm-_GdTl8 They include 7 videos that go over what Spark is, and how it is meant to be used.

In summery Spark is built to run big data across many machines. It is not meant for many transactions, but is meant for the analysis of data.

Spark is built using a RDD (Resilient Distributed Dataset). This means that Spark does not edit the data that is passed in, but rather used the data to preform transformations (filters, joins, maps, etc.) and then actions (reductions, counts, etc.)

The results are stored into new datasets instead of altering existing ones.

The RDD's are meant to be stored in memory for quick access, however Spark is built so that if necessary the RDD's can be written to the drive. (At a reduced IO speed).

As mentioned above, there are three main steps in a Spark program. Creation, Transformation, Action.

Setting Up The Scala Environment For Spark

To run a stand alone application on windows, using the Scala IDE, you need to create a Maven project. You do this by clicking File > new > Project > Maven Project. Once the project is created, to use Scala instead of Java, the name of your source file should be refactored from src/main/java to src/main/scala.

From here you need to edit the pom.xml file to include Spark.

You can do this by adding the following code:

<dependencies>
	<dependency>
		<groupId>org.apache.spark</groupId>
		<artifactId>spark-core_2.10</artifactId>
		<version>1.6.0</version>
	</dependency>
</dependencies>

When the project is built it will include the Apache Spark files.

Imports In Object

Like Java, Scala uses both classes and objects, to use Spark inside these you must import it by adding the following two lines to the top of the file.

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext


Windows Bug

There is a known bug on windows for Scala where it will not run unless hadoop is installed. This happens even though hadoop is not required for Scala or Spark. To fix this issue, a workaround is to add one of the hadoop exe files to your program.

To do this, I had to download the pre-compiled hadoop file and put it in my C directory. From there I had to add the following line to my file to tell Spark where to find the exe.


    System.setProperty("hadoop.home.dir", "c:\\winutil\\")



Creation

The creation step is the step in which the data is loaded into the first RDD. This data can be loaded from multiple sources, but for the purpose of this assignment it is loaded from a local text file. In scala, the following line will load a text file into a variable.

val test = sc.textFile("food.txt")

Transformation

The next step is to transform the data that is in the first RDD into something you want to use. You can do this by using filter, map, union, join, sort, etc. In this example we would like to count the number of times each word is used in a piece of text. The following code snip-it will split the RDD by spaces, and map each word with a key value. The value is set to 1 to count each word as 1.

test.flatMap { line =>
line.split(" ")
    {
    .map { word =>
      (word,1)
    }

Action

Finally an action is taken to give the programmer the desired output, for example count, collect, reduce, lookup, save. In this example we want to reduce the RDD on each word by adding the value of the word. To do this we can use:

.reduceByKey(_ + _)

Full Program

To run the program on windows, the following is the main.

object WordCount {
  def main(args: Array[String]) ={
    
    System.setProperty("hadoop.home.dir", "c:\\winutil\\")

    val conf = new SparkConf()
      .setAppName("WordCount")
      .setMaster("local")
      
    val sc = new SparkContext(conf)
    
    val test = sc.textFile("food.txt")
    
    test.flatMap { line => 
      line.split(" ")  
    }
    .map { word => 
      (word,1)  
    }
    .reduceByKey(_ + _)
    .saveAsTextFile("food.count.txt")

    sc.stop
  }
}


Assignment

For the assignment, I wanted to try and create a program that would compare two pieces of text and try to determine if they were written by the same person. To do this I will take the two pieces of text and compare how they were written. This includes looking at the words per sentence, sentences per paragraph, characters per word, number of commas, and the number of colons.

Full Program

object TextCompare {
  def main(args: Array[String]) ={
    
    System.setProperty("hadoop.home.dir", "c:\\winutil\\")

    val conf = new SparkConf()
      .setAppName("TextCompare")
      .setMaster("local")
      
    val sc = new SparkContext(conf)
    
    
    
    val text1 = sc.textFile("text1.txt")

    val text2 = sc.textFile("text2.txt")

    //GET START TIME
    val time1 = java.lang.System.currentTimeMillis()
    
    //TEXT ONE
    val t1WordCount = text1.flatMap(line => line.split(" ")).count()
    
    val t1SplitChars = text1.flatMap(line => line.split(""))

    val t1SentenceCount = t1SplitChars.filter(character => character.matches("[.!?]")).count()
    val t1CommaCount = t1SplitChars.filter(character => character.matches("[,]")).count()
    val t1ColonCount = t1SplitChars.filter(character => character.matches("[:;]")).count()
    val t1CharacterCount = t1SplitChars.count()
    val t1ParagraphCount = text1.count()
    
    val t1AverageWordsPerSentence = t1WordCount/1.0/t1SentenceCount
    val t1AverageWordLength = t1CharacterCount/1.0/t1WordCount
    val t1AverageSentencePerParagraph = t1SentenceCount/1.0/t1ParagraphCount
    val t1AverageCommasPerSentence = t1CommaCount/1.0/t1SentenceCount
    val t1AverageColonsPerParagraph = t1ColonCount/1.0/t1ParagraphCount

    //TEXT TWO
    val t2WordCount = text2.flatMap(line => line.split(" ")).count()
    
    val t2SplitChars = text2.flatMap(line => line.split(""))

    val t2SentenceCount = t2SplitChars.filter(character => character.matches("[.!?]")).count()
    val t2CommaCount = t2SplitChars.filter(character => character.matches("[,]")).count()
    val t2ColonCount = t2SplitChars.filter(character => character.matches("[:;]")).count()
    val t2CharacterCount = t2SplitChars.filter(character => character.matches("[a-zA-Z]")).count()
    val t2ParagraphCount = text2.count()
    
    //GET END TIME
    val time2 = java.lang.System.currentTimeMillis()
    
    
    val t2AverageWordsPerSentence = t2WordCount/1.0/t2SentenceCount
    val t2AverageWordLength = t2CharacterCount/1.0/t2WordCount
    val t2AverageSentencePerParagraph = t2SentenceCount/1.0/t2ParagraphCount
    val t2AverageCommasPerSentence = t2CommaCount/1.0/t2SentenceCount
    val t2AverageColonsPerParagraph = t2ColonCount/1.0/t2ParagraphCount
    
    val totalDiff = 
    Math.abs(t1AverageWordsPerSentence - t2AverageWordsPerSentence)+
	  Math.abs(t1AverageWordLength - t2AverageWordLength)+
	  Math.abs(t1AverageSentencePerParagraph - t2AverageSentencePerParagraph)+
	  Math.abs(t1AverageCommasPerSentence - t2AverageCommasPerSentence)+
	  Math.abs(t1AverageColonsPerParagraph - t2AverageColonsPerParagraph)
	  
	  println("Words: "+t1WordCount+"::"+t2WordCount)
	  println("Sentences: "+t1SentenceCount+"::"+t2SentenceCount)
	  println("Commas: "+t1CommaCount+"::"+t2CommaCount)
	  println("Colons: "+t1ColonCount+"::"+t2ColonCount)
	  println("Characters: "+t1CharacterCount+"::"+t2CharacterCount)
	  println("Paragraphs: "+t1ParagraphCount+"::"+t2ParagraphCount)
	  
    
	  println("Words/sentence: "+t1AverageWordsPerSentence+"::"+t2AverageWordsPerSentence)
	  println("Word legnth: "+t1AverageWordLength+"::"+t2AverageWordLength)
	  println("Sentence/paragraph: "+t1AverageSentencePerParagraph+"::"+t2AverageSentencePerParagraph)
	  println("Commas/sentence: "+t1AverageCommasPerSentence+"::"+t2AverageCommasPerSentence)
	  println("Colons/paragraph: "+t1AverageColonsPerParagraph+"::"+t2AverageColonsPerParagraph)
	  
	  println("The two pieces have a: " + totalDiff + "% different style of writing.")
	  
	  if (totalDiff <= 5){
	    println(">> It is likley this was written by the same person.")
	  }else{
	    println(">> It is likley this was written by two different people.")
	  }
	  
    println(">>>> Total Time Taken is: "+(time2-time1)+"ms")
    
    sc.stop
  }
}


Compare To CPP + Open MP

Spark is not meant to be run with small data on a single machine, but just for fun, I decided to compare it to a program written in cpp, and optimized using Open MP.

I took six pieces of text to comparer them, and see how fast they would execute. Now Spark is not meant to be used on small pieces of data, but without terabytes of data to use, and without a network of computers to test it on, I decided to run relatively small pieces of text on a single computer.


File Size Text one characters Text two characters Combined
Small 2524 2473 4997
Medium 6992 4344 11336
Large 120375 129858 250233


When I ran the programs I had the following results


File Size CPP (ms) CPP+OpenMP (ms) Scala+Spark (ms)
Small 3 21 13539
Medium 8 26 13551
Large 157 50 14135

 

What this shows is that the overhead of each parallelization technique is important to consider. Since scala+spark is meant to run across multiple computers, there is no optimization when running on a single machine, and as you can see by the chart above, the time taken to complete the program increases with the file size.

CPP Code

#include <iostream>
#include <fstream>
#include <cstring>
#include <chrono>
#include <string>

#include <omp.h>

using namespace std::chrono;

// report system time
//
void reportTime(const char* msg, steady_clock::duration span) {
	auto ms = duration_cast<milliseconds>(span);
	std::cout << std::endl << msg << " - took - " <<
		ms.count() << " milliseconds" << std::endl;
}

void processString(int len, std::string str, int& totalSentenceCount, int& totalWordCount, int& totalCharCount, int& totalParagraphCount, int& totalCommas, int& totalColons, double& averageWordsPerSentence, double& averageWordLength, double& averageSentencePerParagraph, double& averageCommasPerSentence, double& averageColonsPerParagraph){
	for (int i = 0; i < len; i++){
		if (str[i] == '.' || str[i] == '!' || str[i] == '?' || str[i] == ' ' || str[i] == '\r' || str[i] == '\n' || str[i] == '\t' || str[i] == ':' || str[i] == ';' || str[i] == ',') {
			if (str[i] == '.' || str[i] == '!' || str[i] == '?'){
				totalSentenceCount++;
			}else if(str[i] == '\n'){
				totalParagraphCount++;
			}
			else if (str[i] == ','){
				totalCommas++;
			}
			else if (str[i] == ':' || str[i] == ';'){
				totalColons++;
			}
			else{
				totalWordCount++;
			}
		}
		else{
			totalCharCount++;
		}
	}
	averageWordsPerSentence = totalWordCount / 1.0 / totalSentenceCount;
	averageWordLength = totalCharCount / 1.0 / totalWordCount;
	averageSentencePerParagraph = totalSentenceCount / 1.0 / totalParagraphCount;
	averageCommasPerSentence = totalCommas / 1.0 / totalSentenceCount;
	averageColonsPerParagraph = totalColons / 1.0 / totalParagraphCount;

}

void processStringOMP(int len, std::string str, int& totalSentenceCount, int& totalWordCount, int& totalCharCount, int& totalParagraphCount, int& totalCommas, int& totalColons, double& averageWordsPerSentence, double& averageWordLength, double& averageSentencePerParagraph, double& averageCommasPerSentence, double& averageColonsPerParagraph){
	int twc = 0, tsc = 0, tpc=0, tc= 0, tco=0, tcc = 0;
#pragma omp parallel for reduction(+:twc,tsc,tpc,tc,tco,tcc)
	for (int i = 0; i < len; i++){
		if (str[i] == '.' || str[i] == '!' || str[i] == '?' || str[i] == ' ' || str[i] == '\r' || str[i] == '\n' || str[i] == '\t' || str[i] == ':' || str[i] == ';' || str[i] == ',') {
			if (str[i] == '.' || str[i] == '!' || str[i] == '?'){
				tsc++;
			}
			else if (str[i] == '\n'){
				tpc++;
			}
			else if (str[i] == ','){
				tc++;
			}
			else if (str[i] == ':' || str[i] == ';'){
				tco++;
			}
			else{
				twc++;
			}
		}
		else{
			tcc++;
		}
	}
	totalWordCount = twc;
	totalSentenceCount = tsc;
	totalParagraphCount = tpc;
	totalCommas = tc;
	totalColons = tco;
	totalCharCount = tcc;

	averageWordsPerSentence = totalWordCount / 1.0 / totalSentenceCount;
	averageWordLength = totalCharCount / 1.0 / totalWordCount;
	averageSentencePerParagraph = totalSentenceCount / 1.0 / totalParagraphCount;
	averageCommasPerSentence = totalCommas / 1.0 / totalSentenceCount;
	averageColonsPerParagraph = totalColons / 1.0 / totalParagraphCount;

}

int main(int argc, char** argv) {

	if (argc != 3) {
		std::cerr << "*** Incorrect number of arguments ***\n";
		std::cerr << "Usage: " << argv[0] << " first file\n";
		std::cerr << "Usage: " << argv[1] << " second file\n";
		return 1;
	}

	std::ifstream t1(argv[1]);
	std::string str1((std::istreambuf_iterator<char>(t1)),
		(std::istreambuf_iterator<char>()));

	std::ifstream t2(argv[2]);
	std::string str2((std::istreambuf_iterator<char>(t2)),
		(std::istreambuf_iterator<char>()));

	int len1 = str1.size();
	int len2 = str2.size();

	//define all required variables
	int totalWordCount1 = 0, totalWordCount2 = 0,
		totalCharCount1 = 0, totalCharCount2 = 0,
		totalSentenceCount1 = 0, totalSentenceCount2 = 0,
		totalParagraphCount1 = 0, totalParagraphCount2 = 0,
		totalCommas1 = 0, totalCommas2 = 0,
		totalColons1 = 0, totalColons2 = 0;
	double averageWordsPerSentence1, averageWordsPerSentence2,
		averageWordLength1, averageWordLength2,
		averageSentencePerParagraph1, averageSentencePerParagraph2,
		averageCommasPerSentence1, averageCommasPerSentence2,
		averageColonsPerParagraph1, averageColonsPerParagraph2;


	int n = std::atoi(argv[1]);
	steady_clock::time_point ts, te;

	//time start
	ts = steady_clock::now();

	//Start counting first file.
	processString(len1, str1, totalSentenceCount1, totalWordCount1, totalCharCount1, totalParagraphCount1, totalCommas1, totalColons1, averageWordsPerSentence1, averageWordLength1, averageSentencePerParagraph1, averageCommasPerSentence1, averageColonsPerParagraph1);

	//Start counting second file.
	processString(len2, str2, totalSentenceCount2, totalWordCount2, totalCharCount2, totalParagraphCount2, totalCommas2, totalColons2, averageWordsPerSentence2, averageWordLength2, averageSentencePerParagraph2, averageCommasPerSentence2, averageColonsPerParagraph2);

	//time end
	te = steady_clock::now();

	std::cout << "The first text has  : " << totalCharCount1 << " characters, " << totalWordCount1 << " words, " << totalSentenceCount1 << " sentences, " << std::endl;
	std::cout << totalParagraphCount1 << " paragraphs, " << totalCommas1 << " commas, " << totalColons1 << " colons, " << std::endl;
	std::cout << averageWordsPerSentence1 << " average words/sentence, " << averageWordLength1 << " average characters/word," << std::endl;
	std::cout << averageSentencePerParagraph1 << " average sentences/paragraph, " << averageCommasPerSentence1 << " average commas/sentence, " << std::endl;
	std::cout << averageColonsPerParagraph1 << " average colons/paragraph. " << std::endl << std::endl;
	
	std::cout << "The second text has : " << totalCharCount2 << " characters, " << totalWordCount2 << " words, " << totalSentenceCount2 << " sentences, " << std::endl;
	std::cout << totalParagraphCount2 << " paragraphs, " << totalCommas2 << " commas, " << totalColons2 << " colons, " << std::endl;
	std::cout << averageWordsPerSentence2 << " average words/sentence, " << averageWordLength2 << " average characters/word," << std::endl;
	std::cout << averageSentencePerParagraph2 << " average sentences/paragraph, " << averageCommasPerSentence2 << " average commas/sentence, " << std::endl;
	std::cout << averageColonsPerParagraph2 << " average colons/paragraph. " << std::endl << std::endl;

	//how similar is the writing?
	double totalDiff = 0;
	totalDiff += abs(averageWordsPerSentence1 - averageWordsPerSentence2);
	totalDiff += abs(averageWordLength1 - averageWordLength2);
	totalDiff += abs(averageSentencePerParagraph1 - averageSentencePerParagraph2);
	totalDiff += abs(averageCommasPerSentence1 - averageCommasPerSentence2);
	totalDiff += abs(averageColonsPerParagraph1 - averageColonsPerParagraph2);
		
	std::cout << "The two pieces have a: " << totalDiff << "% different style of writing." << std::endl << std::endl;
	if (totalDiff <= 5){
		std::cout << ">> It is likley this was written by the same person." << std::endl;
	}
	else{
		std::cout << ">> It is likley this was written by two different people." << std::endl;
	}

	reportTime("Test", te - ts);
		
	std::cout << "\n\nPress Enter To Exit...";
	std::cin.get();

}


Thoughts On Spark

Spark is great for processing big data, but not so much for smaller data that can be processed on a single machine. I can see why Spark is growing in popularity among big data because of its ease of use, and its easy integration.

Thoughts On Scala

Scala is surprisingly easy to learn. It is based off of Java, and there is a large community that is using it, so there is a lot of support out there, and most questions you have will already be answered online.

How Does This Help You?

There are a lot of jobs, and there is a lot of money in big data. The average salary of a Sr. Scala developer is $152,000, Average salary of Sr. Java developer is $88,000, Average salary of Sr. C/C++ developer is $89,000.

Is it worth learning Scala?