GPU621/Apache Spark Fall 2022
Apache Spark
Apache Spark Core API
RDD Overview
One of the most important concepts in Spark is a resilient distributed dataset (RDD). RDD is a collection of elements partitioned across the nodes of the cluster that can be operated in parallel. RDDs are created by starting with a file, or an existing Java collection in the driver program, and transforming it. We will introduce some key APIs provided by Spark Core 2.2.1 using Java 8. You can find more information about the RDD here. https://spark.apache.org/docs/2.2.1/rdd-programming-guide.html
Spark Library Installation Using Maven
An Apache Spark application can be easily instantiated using Maven. To add the required libraries, you can copy and paste the following code into the "pom.xml".
<properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.10</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.2.0</version> </dependency> </dependencies>
Create And Set Up Spark
Spark needs to be set up in a cluster so first we need to create a JavaSparkContext object, which tells Spark how to access a cluster. To create a SparkContext you first need to build a SparkConf object that contains information about your application. We will talk about how to set up a spark in a cluster later. Now let's try to create a spark locally. To do that, we will need the following code:
//create and set up spark SparkConf conf = new SparkConf().setAppName("HelloSpark").setMaster("local[*]"); JavaSparkContext sc = new JavaSparkContext(conf); sc.setLogLevel("WARN");
Create RDDs
There are two ways to create RDDs: parallelizing an existing collection in your driver program, or referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.
1. Parallelized Collections
Let’s start with some Java collections by calling JavaSparkContext’s parallelize method on an existing Collection in your driver program. The elements of the collection are copied to form a distributed dataset that can be operated on in parallel.
//create input data list List<Integer> inputData = new ArrayList<>(); inputData.add(11); inputData.add(22); inputData.add(33); inputData.add(44);
//use RDD to run create RDDS JavaRDD<Integer> javaRDD = sc.parallelize(inputData);
2. External Datasets
The other way is to create RDD from any storage source supported by Hadoop, including your local file system, HDFS, Amazon S3, etc. Text file RDDs can be created using SparkContext’s textFile method. This method takes an URI for the file (either a local path on the machine, or a hdfs://, s3n://, etc URI) and reads it as a collection of lines.
//From local file JavaRDD<String> sentences = sc.textFile("src/main/resources/subtitles/input.txt"); //From a S3 file JavaRDD<String> sentences = sc.textFile("s3://gpu621-demo/input.txt");
RDD APIs
Basically there are types of APIs. Transformations and Actions. Transformation APIs are functions that could return another RDD set. Using these APIs, we can create child RDD from parent RDD. Actions are the functions we want to perform onto the actual dataset. They will not return new RDDs.
1. Transformations
1.1 map(func)
The map function iterates over every line in RDD and split into new RDD. It receives a function, and will use that function to each line and create new RDD.
//create input data list List<Integer> inputData = new ArrayList<>(); inputData.add(11); inputData.add(22); inputData.add(33); inputData.add(44);
//create RDD JavaRDD<Integer> javaRDD = sc.parallelize(inputData); // map from one RDD to another RDD JavaRDD<Double> mapRDD = javaRDD.map(value -> Math.sqrt(value));
//Output: 3.3166247903554, 4.69041575982343, 5.744562646538029, 6.6332495807108
mapRDD.foreach(value->System.out.println(value));
1.2 filter(func)
filter() is used when we only want some elements that meet the conditions. When we use this function, we need to pass another predicate function.
//create input data list List<Integer> inputData = new ArrayList<>(); inputData.add(11); inputData.add(22); inputData.add(33); inputData.add(44);
// create RDD JavaRDD<Integer> javaRDD = sc.parallelize(inputData); //use filter JavaRDD<Double> filterRDD = javaRDD.filter(value ->value>=30);
//Output: 33 44
filterRDD.foreach(value->System.out.println(value));
1.3 flatMap(func)
flatMap is similar to map, but the difference is that flatMap could return multiple elements from one input.
//create input data list List<String> inputData = new ArrayList<>(); inputData.add("WARN: Monday 1 May 0101"); inputData.add("ERROR: Monday 1 May 0102"); inputData.add("WARN: Monday 2 May 0301"); inputData.add("ERROR: Monday 3 May 0401"); inputData.add("FATAL: Monday 4 May 0406"); // create RDD JavaRDD<String> sentencesRDD = sc.parallelize(inputData); JavaRDD<String> wordsRDD = sentencesRDD.flatMap(value -> Arrays.asList(value.split(" ")).iterator());
1.4 mapToPair()
It is similar to map transformation; however, this transformation produces PairRDD, that is, an RDD consisting of key and value pairs. This transformation is specific to Java RDDs. With other RDDs, map transformation can perform both (map and mapToPair()) of the tasks.
List<String> inputData = new ArrayList<>(); inputData.add("WARN: Monday 1 May 0101"); inputData.add("ERROR: Monday 1 May 0102"); inputData.add("WARN: Monday 2 May 0301"); inputData.add("ERROR: Monday 3 May 0401"); inputData.add("FATAL: Monday 4 May 0406"); // create RDD JavaRDD<String> logMessages = sc.parallelize(inputData); //use Fluent API JavaPairRDD<String, Long> levelRDD= logMessages.mapToPair(value ->new Tuple2<>(value.split(":")[0], 1L)); // (WARN,1) (ERROR,1) (ERROR,1) (FATAL,1) (WARN,1) levelRDD .foreach(tuple2->System.out.println(tuple2));
1.5 reduceByKey(func)
When we use reduceByKey on a dataset (K, V), the pairs on the same machine with the same key are combined.
List<String> inputData = new ArrayList<>(); inputData.add("WARN: Monday 1 May 0101"); inputData.add("ERROR: Monday 1 May 0102"); inputData.add("WARN: Monday 2 May 0301"); inputData.add("ERROR: Monday 3 May 0401"); inputData.add("FATAL: Monday 4 May 0406"); // create RDD JavaRDD<String> logMessages = sc.parallelize(inputData); //use Fluent API JavaPairRDD<String, Long> levelRDD= logMessages.mapToPair(value ->new Tuple2<>(value.split(":")[0], 1L)); // (WARN,2) (ERROR,2) (FATAL,1) levelRDD . reduceByKey ((a,b)->a+b)
.forEach(t->System.out.println(t))
1.6 sortByKey()
When we apply the sortByKey() function on a dataset of (K, V) pairs, the data is sorted according to the key K in another RDD.
2. Actions
When the action is triggered after the result, new RDD is not formed like transformation. Thus, Actions are Spark RDD operations that give non-RDD values. The values of action are stored to drivers or to the external storage system. It brings laziness of RDD into motion. reduce(func)
1. Reduce()
Aggregate the elements of the dataset using a function func (which takes two arguments and returns one).
//create input data list List<Integer> inputData = new ArrayList<>(); inputData.add(11); inputData.add(22); inputData.add(33); inputData.add(44); //use RDD to run reduce function JavaRDD<Integer> javaRDD = sc.parallelize(inputData); Integer result = javaRDD.reduce((Integer a, Integer b) -> a + b);
//output: 110
System.out.println(result);
2. Count()
count() returns the number of elements in RDD.
3. take(n)
The action take(n) returns n number of elements from RDD. It tries to cut the number of partition it accesses, so it represents a biased collection. We cannot presume the order of the elements.
4. collect()
The action collect() is the common and simplest operation that returns our entire RDDs content to driver program.
5. foreach()
When we have a situation where we want to apply operation on each element of RDD, but it should not return value to the driver. In this case, foreach() function is useful.
Useful Case
Scenario: If you have a video caption script, you want to sort all the word by frequency. How should we do it?
//create and set up spark SparkConf conf = new SparkConf().setAppName("HelloSpark").setMaster("local[*]"); JavaSparkContext sc = new JavaSparkContext(conf); sc.setLogLevel("WARN");
//read data from text file //note that for production mode, we should not use file from local machine //should use S3 or other resources JavaRDD<String> sentences = sc.textFile("src/main/resources/subtitles/input.txt");
//remove the time stamp JavaRDD<String> lettersOnlyRDD = sentences.map(sentence -> sentence.replaceAll("[^a-zA-Z\\s]", "").toLowerCase()); //remove the blank line JavaRDD<String> removeBlankLineRDD = lettersOnlyRDD.filter(sentence -> sentence.trim().length() > 0); //map to only words JavaRDD<String> wordsRDD = removeBlankLineRDD.flatMap(sentence -> Arrays.asList(sentence.split(" ")).iterator());
//create pair
JavaPairRDD<String, Long> pairRDD = wordsRDD.mapToPair(word -> new Tuple2<>(word, 1L));
//get frequency
JavaPairRDD<String, Long> totals = pairRDD.reduceByKey((a, b) -> a + b);
//make frequency as the key
JavaPairRDD<Long, String> reversedMap = totals.mapToPair(t -> new Tuple2(t._2, t._1));
//sort
List<Tuple2<Long, String>> results = reversedMap.sortByKey(false).collect();
results.forEach(t->System.out.println(t));
//close
sc.close();
Deploy Apache Spark Application On AWS
Amazon EMR is a cloud big data solution for petabyte-scale data processing, interactive analytics, and machine learning using open-source frameworks such as Apache Spark, Apache Hive, and Presto provided by AWS cloud service. EMR is easy to use and it has low cost, so it’s a great start for spark beginners.
Prerequisite
From here, I will assume you have an AWS service account and that you have basic knowledge about AWS services like how to use S3 bucket, or how to add role or policy to services.
Also, you will need to have basic knowledge about SSH and Linux commands.
Create an EMR cluster
Search and choose EMR on AWS service panel.
-IMAGE-
Click the Create Cluster button.
-IMAGE-
Enter as cluster name and choose a release version. Here I will choose the EMR-5.11.1 for the Release version. For the application, you can see that there are many options, we will choose Spark as this is our main topic.
-IMAGE-
Next, we need to choose an instance type. As you may know, the cluster will run on multiple EC2 instances and different EC2 instances have different features. Please note, different EC2 types cost differently. Please refer to the EC2 type table to check the prices. Here I will choose c4.large type as it’s the most inexpensive one. For the number of instances, I will choose 3, that is, one master and 2 nodes.
For the security part. Please choose an EC2 key pair you already used for other services before, or create a new one.
Click Create Cluster button to wait for the cluster to be set up.
-IMAGE-
You will see a page like this. Next, we need to change the security group for Master, which acts like a firewall to add an inbound rule.
-IMAGE-
We need to open port 22 and port 18080 for your IP so that you can visit the Master EC2.
Then, you can try to ssh to the master node by using
ssh -I <private_key.pem> hadoop@<MasterPublicDNS>
You should see a welcome page like this:
-IMAGE-
Create an S3 bucket
Unlike the previous case where we run on a solo computer, now we need to run the application on different nodes. It makes no sense to read the file from a local hard disk because most of the time the file will be too big for one node to handle. We need to put the file onto something that all nodes can share, and we can use aN S3 file as the input file. S3 is another service AWS provides. The size of a single file on S3 can be as large as 5TB. I will skip this part and please search how to create a new bucket to hold both the input file and the application package. Please make the bucket open to the public so you will not have permission issue later on.
-IMAGE-
Build the application
1. Change code
In order to run it on the cloud cluster, we need to do some modifications to the code. First, let’s change the file path from a local position to the s3.
From
JavaRDD<String> sentences = sc.textFile("src/main/resources/subtitles/input.txt");
To
JavaRDD<String> sentences = sc.textFile("s3://gpu621-demo/input.txt");
Also, add the entry point class to the pom file:
-IMAGE-
2. Build the package
Build the package using command line or the IDE. If you are using Idea, click the package under lifecycle Tab of maven
-IMAGE-
You will get a jar file under the target folder.
Upload the jar file to your S3 bucket
Upload the jar file into the S3 bucket you created before.
Run the application on the cluster
Ssh into the master node of the cluster. Then issue a copy command to copy the jar file to the master node using:
aws s3 cp <s3://yourbucket/jarFileName.jar>
Then you can run the app using:
Spark-submit <jarFileName.jar>
Then you should see the log info and the output of your application.
-IMAGE-
That’s how we deploy a spark app on aws EMR.