Open main menu

CDOT Wiki β

Changes

The Real A Team

12,564 bytes added, 12:31, 7 April 2016
Introduction To Spark
==PreAssignment==
This assignment will be going over how to do a simple word count in Scala using Spark.
 
===Introduction To Spark===
To introduce myself to spark I watched a couple of youtube videos that were filmed at a Spark conference. The videos can be found here: https://www.youtube.com/watch?v=nxCm-_GdTl8 They include 7 videos that go over what Spark is, and how it is meant to be used.
 
In summery Spark is built to run big data across many machines. It is not meant for many transactions, but is meant for the analysis of data.
 
Spark is built using a RDD (Resilient Distributed Dataset). This means that Spark does not edit the data that is passed in, but rather used the data to preform transformations (filters, joins, maps, etc.) and then actions (reductions, counts, etc.)
 
The results are stored into new datasets instead of altering existing ones.
 
The RDD's are meant to be stored in memory for quick access, however Spark is built so that if necessary the RDD's can be written to the drive. (At a reduced IO speed).
 
As mentioned above, there are three main steps in a Spark program. Creation, Transformation, Action.
====Setting Up The Scala Environment For Spark====
To run a stand alone application on windows, using the Scala IDE, you need to create a Marven Maven project. You do this by clicking File > new > Project > Marven Maven Project. Once the project is created, to use Scala instead of Java, the name of your source file should be refactored from src/main/java to src/main/scala.
From here you need to edit the pom.xml file to include Spark.
</nowiki>
 
===Introduction To Spark===
To introduce myself to spark I watched a couple of youtube videos that were filmed at a Spark conference. The videos can be found here: https://www.youtube.com/watch?v=nxCm-_GdTl8 They include 7 videos that go over what Spark is, and how it is meant to be used.
 
In summery Spark is built to run big data across many machines. It is not meant for many transactions, but is meant for the analysis of data.
 
Spark is built using a RDD (Resilient Distributed Dataset). This means that Spark does not edit the data that is passed in, but rather used the data to preform transformations (filters, joins, maps, etc.) and then actions (reductions, counts, etc.)
 
The results are stored into new datasets instead of altering existing ones.
 
The RDD's are meant to be stored in memory for quick access, however Spark is built so that if necessary the RDD's can be written to the drive. (At a reduced IO speed).
 
As mentioned above, there are three main steps in a Spark program. Creation, Transformation, Action.
===Full Program===
<nowiki>
object WordCount TextCompare {
def main(args: Array[String]) ={
val sc = new SparkContext(conf)
val text1 = sc.textFile("text1.txt")
val text2 = sc.textFile("text2.txt")
//GET START TIME val time1 = java.lang.System.currentTimeMillis() //TEXT ONE
val t1WordCount = text1.flatMap(line => line.split(" ")).count()
val t1SentenceCount t1SplitChars = text1.flatMap(line => line.split("")).filter(character => character = ".").count()
val t1SentenceCount = t1SplitChars.filter(character => character.matches("[.!?]")).count()
val t1CommaCount = t1SplitChars.filter(character => character.matches("[,]")).count()
val t1ColonCount = t1SplitChars.filter(character => character.matches("[:;]")).count()
val t1CharacterCount = t1SplitChars.count()
val t1ParagraphCount = text1.count()
val t1AverageWordsPerSentence = t1WordCount/1.0/t1SentenceCount
val t1AverageWordLength = t1CharacterCount/1.0/t1WordCount
val t1AverageSentencePerParagraph = t1SentenceCount/1.0/t1ParagraphCount
val t1AverageCommasPerSentence = t1CommaCount/1.0/t1SentenceCount
val t1AverageColonsPerParagraph = t1ColonCount/1.0/t1ParagraphCount
//TEXT TWO val t2WordCount = text2.flatMap(line => line.split(" ")).count() val t2SplitChars = text2.flatMap(line => line.split(""))
val t2SentenceCount = t2SplitChars.filter(character => character.matches("[.!?]")).count()
val t2CommaCount = t2SplitChars.filter(character => character.matches("[,]")).count()
val t2ColonCount = t2SplitChars.filter(character => character.matches("[:;]")).count()
val t2CharacterCount = t2SplitChars.filter(character => character.matches("[a-zA-Z]")).count()
val t2ParagraphCount = text2.count()
//GET END TIME
val time2 = java.lang.System.currentTimeMillis()
val t2AverageWordsPerSentence = t2WordCount/1.0/t2SentenceCount
val t2AverageWordLength = t2CharacterCount/1.0/t2WordCount
val t2AverageSentencePerParagraph = t2SentenceCount/1.0/t2ParagraphCount
val t2AverageCommasPerSentence = t2CommaCount/1.0/t2SentenceCount
val t2AverageColonsPerParagraph = t2ColonCount/1.0/t2ParagraphCount
val totalDiff =
Math.abs(t1AverageWordsPerSentence - t2AverageWordsPerSentence)+
Math.abs(t1AverageWordLength - t2AverageWordLength)+
Math.abs(t1AverageSentencePerParagraph - t2AverageSentencePerParagraph)+
Math.abs(t1AverageCommasPerSentence - t2AverageCommasPerSentence)+
Math.abs(t1AverageColonsPerParagraph - t2AverageColonsPerParagraph)
println("Words: "+t1WordCount+"::"+t2WordCount)
println("Sentences: "+t1SentenceCount+"::"+t2SentenceCount)
println("Commas: "+t1CommaCount+"::"+t2CommaCount)
println("Colons: "+t1ColonCount+"::"+t2ColonCount)
println("Characters: "+t1CharacterCount+"::"+t2CharacterCount)
println("Paragraphs: "+t1ParagraphCount+"::"+t2ParagraphCount)
println("Words/sentence: "+t1AverageWordsPerSentence+"::"+t2AverageWordsPerSentence)
println("Word legnth: "+t1AverageWordLength+"::"+t2AverageWordLength)
println("Sentence/paragraph: "+t1AverageSentencePerParagraph+"::"+t2AverageSentencePerParagraph)
println("Commas/sentence: "+t1AverageCommasPerSentence+"::"+t2AverageCommasPerSentence)
println("Colons/paragraph: "+t1AverageColonsPerParagraph+"::"+t2AverageColonsPerParagraph)
println("The two pieces have a: " + totalDiff + "% different style of writing.")
if (totalDiff <= 5){
println(">> It is likley this was written by the same person.")
}else{
println(">> It is likley this was written by two different people.")
}
println(">>>> Total Time Taken is: "+(time2-time1)+"ms")
sc.stop
}
}
</nowiki>
 
 
 
==Compare To CPP + Open MP==
Spark is not meant to be run with small data on a single machine, but just for fun, I decided to compare it to a program written in cpp, and optimized using Open MP.
 
I took six pieces of text to comparer them, and see how fast they would execute. Now Spark is not meant to be used on small pieces of data, but without terabytes of data to use, and without a network of computers to test it on, I decided to run relatively small pieces of text on a single computer.
 
 
{| class="wikitable"
|-
| File Size || Text one characters || Text two characters ||Combined
|-
| Small || 2524 || 2473 || 4997
|-
| Medium || 6992 || 4344 || 11336
|-
| Large || 120375 || 129858 || 250233
|}
 
 
When I ran the programs I had the following results
 
 
{| class="wikitable"
|-
| File Size || CPP (ms) || CPP+OpenMP (ms) || Scala+Spark (ms)
|-
| Small || 3 || 21 || 13539
|-
| Medium || 8 || 26 || 13551
|-
| Large || 157 || 50 || 14135
|}
 
[[Image:Graph_Spark_Vs_CPP.png|500px| ]]
 
What this shows is that the overhead of each parallelization technique is important to consider. Since scala+spark is meant to run across multiple computers, there is no optimization when running on a single machine, and as you can see by the chart above, the time taken to complete the program increases with the file size.
 
===CPP Code===
 
<nowiki>
#include <iostream>
#include <fstream>
#include <cstring>
#include <chrono>
#include <string>
 
#include <omp.h>
 
using namespace std::chrono;
 
// report system time
//
void reportTime(const char* msg, steady_clock::duration span) {
auto ms = duration_cast<milliseconds>(span);
std::cout << std::endl << msg << " - took - " <<
ms.count() << " milliseconds" << std::endl;
}
 
void processString(int len, std::string str, int& totalSentenceCount, int& totalWordCount, int& totalCharCount, int& totalParagraphCount, int& totalCommas, int& totalColons, double& averageWordsPerSentence, double& averageWordLength, double& averageSentencePerParagraph, double& averageCommasPerSentence, double& averageColonsPerParagraph){
for (int i = 0; i < len; i++){
if (str[i] == '.' || str[i] == '!' || str[i] == '?' || str[i] == ' ' || str[i] == '\r' || str[i] == '\n' || str[i] == '\t' || str[i] == ':' || str[i] == ';' || str[i] == ',') {
if (str[i] == '.' || str[i] == '!' || str[i] == '?'){
totalSentenceCount++;
}else if(str[i] == '\n'){
totalParagraphCount++;
}
else if (str[i] == ','){
totalCommas++;
}
else if (str[i] == ':' || str[i] == ';'){
totalColons++;
}
else{
totalWordCount++;
}
}
else{
totalCharCount++;
}
}
averageWordsPerSentence = totalWordCount / 1.0 / totalSentenceCount;
averageWordLength = totalCharCount / 1.0 / totalWordCount;
averageSentencePerParagraph = totalSentenceCount / 1.0 / totalParagraphCount;
averageCommasPerSentence = totalCommas / 1.0 / totalSentenceCount;
averageColonsPerParagraph = totalColons / 1.0 / totalParagraphCount;
 
}
 
void processStringOMP(int len, std::string str, int& totalSentenceCount, int& totalWordCount, int& totalCharCount, int& totalParagraphCount, int& totalCommas, int& totalColons, double& averageWordsPerSentence, double& averageWordLength, double& averageSentencePerParagraph, double& averageCommasPerSentence, double& averageColonsPerParagraph){
int twc = 0, tsc = 0, tpc=0, tc= 0, tco=0, tcc = 0;
#pragma omp parallel for reduction(+:twc,tsc,tpc,tc,tco,tcc)
for (int i = 0; i < len; i++){
if (str[i] == '.' || str[i] == '!' || str[i] == '?' || str[i] == ' ' || str[i] == '\r' || str[i] == '\n' || str[i] == '\t' || str[i] == ':' || str[i] == ';' || str[i] == ',') {
if (str[i] == '.' || str[i] == '!' || str[i] == '?'){
tsc++;
}
else if (str[i] == '\n'){
tpc++;
}
else if (str[i] == ','){
tc++;
}
else if (str[i] == ':' || str[i] == ';'){
tco++;
}
else{
twc++;
}
}
else{
tcc++;
}
}
totalWordCount = twc;
totalSentenceCount = tsc;
totalParagraphCount = tpc;
totalCommas = tc;
totalColons = tco;
totalCharCount = tcc;
 
averageWordsPerSentence = totalWordCount / 1.0 / totalSentenceCount;
averageWordLength = totalCharCount / 1.0 / totalWordCount;
averageSentencePerParagraph = totalSentenceCount / 1.0 / totalParagraphCount;
averageCommasPerSentence = totalCommas / 1.0 / totalSentenceCount;
averageColonsPerParagraph = totalColons / 1.0 / totalParagraphCount;
 
}
 
int main(int argc, char** argv) {
 
if (argc != 3) {
std::cerr << "*** Incorrect number of arguments ***\n";
std::cerr << "Usage: " << argv[0] << " first file\n";
std::cerr << "Usage: " << argv[1] << " second file\n";
return 1;
}
 
std::ifstream t1(argv[1]);
std::string str1((std::istreambuf_iterator<char>(t1)),
(std::istreambuf_iterator<char>()));
 
std::ifstream t2(argv[2]);
std::string str2((std::istreambuf_iterator<char>(t2)),
(std::istreambuf_iterator<char>()));
 
int len1 = str1.size();
int len2 = str2.size();
 
//define all required variables
int totalWordCount1 = 0, totalWordCount2 = 0,
totalCharCount1 = 0, totalCharCount2 = 0,
totalSentenceCount1 = 0, totalSentenceCount2 = 0,
totalParagraphCount1 = 0, totalParagraphCount2 = 0,
totalCommas1 = 0, totalCommas2 = 0,
totalColons1 = 0, totalColons2 = 0;
double averageWordsPerSentence1, averageWordsPerSentence2,
averageWordLength1, averageWordLength2,
averageSentencePerParagraph1, averageSentencePerParagraph2,
averageCommasPerSentence1, averageCommasPerSentence2,
averageColonsPerParagraph1, averageColonsPerParagraph2;
 
 
int n = std::atoi(argv[1]);
steady_clock::time_point ts, te;
 
//time start
ts = steady_clock::now();
 
//Start counting first file.
processString(len1, str1, totalSentenceCount1, totalWordCount1, totalCharCount1, totalParagraphCount1, totalCommas1, totalColons1, averageWordsPerSentence1, averageWordLength1, averageSentencePerParagraph1, averageCommasPerSentence1, averageColonsPerParagraph1);
 
//Start counting second file.
processString(len2, str2, totalSentenceCount2, totalWordCount2, totalCharCount2, totalParagraphCount2, totalCommas2, totalColons2, averageWordsPerSentence2, averageWordLength2, averageSentencePerParagraph2, averageCommasPerSentence2, averageColonsPerParagraph2);
 
//time end
te = steady_clock::now();
 
std::cout << "The first text has : " << totalCharCount1 << " characters, " << totalWordCount1 << " words, " << totalSentenceCount1 << " sentences, " << std::endl;
std::cout << totalParagraphCount1 << " paragraphs, " << totalCommas1 << " commas, " << totalColons1 << " colons, " << std::endl;
std::cout << averageWordsPerSentence1 << " average words/sentence, " << averageWordLength1 << " average characters/word," << std::endl;
std::cout << averageSentencePerParagraph1 << " average sentences/paragraph, " << averageCommasPerSentence1 << " average commas/sentence, " << std::endl;
std::cout << averageColonsPerParagraph1 << " average colons/paragraph. " << std::endl << std::endl;
std::cout << "The second text has : " << totalCharCount2 << " characters, " << totalWordCount2 << " words, " << totalSentenceCount2 << " sentences, " << std::endl;
std::cout << totalParagraphCount2 << " paragraphs, " << totalCommas2 << " commas, " << totalColons2 << " colons, " << std::endl;
std::cout << averageWordsPerSentence2 << " average words/sentence, " << averageWordLength2 << " average characters/word," << std::endl;
std::cout << averageSentencePerParagraph2 << " average sentences/paragraph, " << averageCommasPerSentence2 << " average commas/sentence, " << std::endl;
std::cout << averageColonsPerParagraph2 << " average colons/paragraph. " << std::endl << std::endl;
 
//how similar is the writing?
double totalDiff = 0;
totalDiff += abs(averageWordsPerSentence1 - averageWordsPerSentence2);
totalDiff += abs(averageWordLength1 - averageWordLength2);
totalDiff += abs(averageSentencePerParagraph1 - averageSentencePerParagraph2);
totalDiff += abs(averageCommasPerSentence1 - averageCommasPerSentence2);
totalDiff += abs(averageColonsPerParagraph1 - averageColonsPerParagraph2);
std::cout << "The two pieces have a: " << totalDiff << "% different style of writing." << std::endl << std::endl;
if (totalDiff <= 5){
std::cout << ">> It is likley this was written by the same person." << std::endl;
}
else{
std::cout << ">> It is likley this was written by two different people." << std::endl;
}
 
reportTime("Test", te - ts);
std::cout << "\n\nPress Enter To Exit...";
std::cin.get();
 
}
 
</nowiki>
 
==Thoughts On Spark==
 
Spark is great for processing big data, but not so much for smaller data that can be processed on a single machine. I can see why Spark is growing in popularity among big data because of its ease of use, and its easy integration.
 
==Thoughts On Scala==
 
Scala is surprisingly easy to learn. It is based off of Java, and there is a large community that is using it, so there is a lot of support out there, and most questions you have will already be answered online.
 
==How Does This Help You?==
There are a lot of jobs, and there is a lot of money in big data. The average salary of a Sr. Scala developer is $152,000, Average salary of Sr. Java developer is $88,000, Average salary of Sr. C/C++ developer is $89,000.
 
Is it worth learning Scala?