Open main menu

CDOT Wiki β

Changes

GPU621/Apache Spark Fall 2022

5,523 bytes added, 20:55, 3 December 2022
Apache Spark Core API
// 33 44
filterRDD.foreach(value->System.out.println(value));
 
1.3 flatMap(func)
 
flatMap is similar to map, but the difference is that flatMap could return multiple elements from one input.
 
//create input data list
List<String> inputData = new ArrayList<>();
inputData.add("WARN: Monday 1 May 0101");
inputData.add("ERROR: Monday 1 May 0102");
inputData.add("WARN: Monday 2 May 0301");
inputData.add("ERROR: Monday 3 May 0401");
inputData.add("FATAL: Monday 4 May 0406");
// create RDD
JavaRDD<String> sentencesRDD = sc.parallelize(inputData);
JavaRDD<String> wordsRDD = sentencesRDD.flatMap(value -> Arrays.asList(value.split(" ")).iterator());
 
1.4 mapToPair()
 
It is similar to map transformation; however, this transformation produces PairRDD, that is, an RDD consisting of key and value pairs. This transformation is specific to Java RDDs. With other RDDs, map transformation can perform both (map and mapToPair()) of the tasks.
 
List<String> inputData = new ArrayList<>();
inputData.add("WARN: Monday 1 May 0101");
inputData.add("ERROR: Monday 1 May 0102");
inputData.add("WARN: Monday 2 May 0301");
inputData.add("ERROR: Monday 3 May 0401");
inputData.add("FATAL: Monday 4 May 0406");
// create RDD
JavaRDD<String> logMessages = sc.parallelize(inputData);
//use Fluent API
JavaPairRDD<String, Long> levelRDD= logMessages.mapToPair(value ->new Tuple2<>(value.split(":")[0], 1L));
// (WARN,1) (ERROR,1) (ERROR,1) (FATAL,1) (WARN,1)
levelRDD .foreach(tuple2->System.out.println(tuple2));
 
1.5 reduceByKey(func)
 
When we use reduceByKey on a dataset (K, V), the pairs on the same machine with the same key are combined.
 
List<String> inputData = new ArrayList<>();
inputData.add("WARN: Monday 1 May 0101");
inputData.add("ERROR: Monday 1 May 0102");
inputData.add("WARN: Monday 2 May 0301");
inputData.add("ERROR: Monday 3 May 0401");
inputData.add("FATAL: Monday 4 May 0406");
// create RDD
JavaRDD<String> logMessages = sc.parallelize(inputData);
//use Fluent API
JavaPairRDD<String, Long> levelRDD= logMessages.mapToPair(value ->new Tuple2<>(value.split(":")[0], 1L));
// (WARN,2) (ERROR,2) (FATAL,1)
levelRDD . reduceByKey ((a,b)->a+b)
.forEach(t->System.out.println(t))
 
1.6 sortByKey()
 
When we apply the sortByKey() function on a dataset of (K, V) pairs, the data is sorted according to the key K in another RDD.
 
2. Actions
 
When the action is triggered after the result, new RDD is not formed like transformation. Thus, Actions are Spark RDD operations that give non-RDD values. The values of action are stored to drivers or to the external storage system. It brings laziness of RDD into motion. reduce(func)
 
1. Reduce()
 
Aggregate the elements of the dataset using a function func (which takes two arguments and returns one).
 
//create input data list
List<Integer> inputData = new ArrayList<>();
inputData.add(11);
inputData.add(22);
inputData.add(33);
inputData.add(44);
//use RDD to run reduce function
JavaRDD<Integer> javaRDD = sc.parallelize(inputData);
Integer result = javaRDD.reduce((Integer a, Integer b) -> a + b);
//output: 110
System.out.println(result);
 
2. Count()
 
count() returns the number of elements in RDD.
 
3. take(n)
 
The action take(n) returns n number of elements from RDD. It tries to cut the number of partition it accesses, so it represents a biased collection. We cannot presume the order of the elements.
 
4. collect()
 
The action collect() is the common and simplest operation that returns our entire RDDs content to driver program.
 
5. foreach()
 
When we have a situation where we want to apply operation on each element of RDD, but it should not return value to the driver. In this case, foreach() function is useful.
 
===Useful Case===
 
Scenario: If you have a video caption script, you want to sort all the word by frequency. How should we do it?
 
//create and set up spark
SparkConf conf = new SparkConf().setAppName("HelloSpark").setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
sc.setLogLevel("WARN");
 
//read data from text file
//note that for production mode, we should not use file from local machine
//should use S3 or other resources
JavaRDD<String> sentences = sc.textFile("src/main/resources/subtitles/input.txt");
 
//remove the time stamp
JavaRDD<String> lettersOnlyRDD = sentences.map(sentence -> sentence.replaceAll("[^a-zA-Z\\s]", "").toLowerCase());
//remove the blank line
JavaRDD<String> removeBlankLineRDD = lettersOnlyRDD.filter(sentence -> sentence.trim().length() > 0);
//map to only words
JavaRDD<String> wordsRDD = removeBlankLineRDD.flatMap(sentence -> Arrays.asList(sentence.split(" ")).iterator());
//create pair
JavaPairRDD<String, Long> pairRDD = wordsRDD.mapToPair(word -> new Tuple2<>(word, 1L));
//get frequency
JavaPairRDD<String, Long> totals = pairRDD.reduceByKey((a, b) -> a + b);
//make frequency as the key
JavaPairRDD<Long, String> reversedMap = totals.mapToPair(t -> new Tuple2(t._2, t._1));
//sort
List<Tuple2<Long, String>> results = reversedMap.sortByKey(false).collect();
//print
results.forEach(t->System.out.println(t));
//close
sc.close();
==Deploy Apache Spark Application On AWS==
92
edits