Open main menu

CDOT Wiki β

Changes

GPU621/Apache Spark

2,477 bytes added, 18:12, 30 November 2020
Applications
== Applications ==
* In the healthcare sector for curing diseases, reducing medical costs, predicting and managing epidemics, and maintaining the quality of human life by keeping track of large-scale health index and metrics.
* In the financial industry to assess risk, build investment models, and create trading algorithms
* In the retail industry to analyze structured and unstructured data to understand and serve their customers, fraud detection and prevention, inventory forecasting, etc.
== Architecture ==
[[File: Cluster-overview.png|thumb|upright=1.5|right|alt=Spark cluster|4.1 Spark Cluster components]]
One of the distinguishing features of Spark is that it processes data in RAM using a concept known as Resilient Distributed Datasets (RDDs) - an immutable distributed collection of objects which can contain any type of Python, Java, or Scala objects, including user-defined classes. Each dataset is divided into logical partitions which may be computed on different nodes of the cluster. Spark's RDDs function as a working set for distributed programs that offer a restricted form of distributed shared memory. Another important abstraction in Spark is Directed Acyclic Graph or DAG which is the scheduling layer that implements stage-oriented scheduling.
=== Compatibility ===
Spark can run as a standalone application or on top of Hadoop YARN or Apache Mesos. Spark supports data sources that implement Hadoop input format, so it can integrate with all the same data sources and file formats that Hadoop supports.
[[File:Hadoop-vs-spark.png|upright=2|right||300px]]
=== Data Processing ===
In addition to plain data processing, Spark can also process graphs, and it also has the MLlib machine learning library. Due to its high performance, Spark can do both real-time and batch processing. However, Hadoop MapReduce is great only for batch processing.
'''Ensure that Dataproc, Compute Engine, and Cloud Storage APIs are all enabled'''
# Go to '''Menu -> API & Services -> Library.# Search for the API name and enable them if they are not already enabled.'''
'''Create a Cloud Storage Bucket by going from '''<br># Menu -> Storage -> Browser -> Create Bucket''''''<br># Make a note of the bucket name.
# To copy from the VM local disk to Cloud Storage bucket enter the following command in the shell:
<Code> gsutil cp /usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar gs://<myBucketName>/ </Code>
 
 
'''WordCount.class from Hadoop MapReduce examples'''
 
// WordCountCount.class
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Using the same hardware (RAM, CPUs, HDD) across a 6 node cluster and processing the same data (8 .txt files for total size of 7.77 GB) we can only see an approximately 12% performance improvement between Hadoop Mapreduce and Spark using a word count algorithm. This falls far short of the 10 times faster on disk and 100 times faster in-memory.
Spark does require more memory than Hadoop to cache data in memory, however that should not be a limitation in this case as the worker nodes have 15 GB of memory and none of the input files exceed 2GB. This is more than enough space for Spark to store input data in memory in the resilient distributed datasets (RDDs). It is worth noting that Spark performs best when iterating over the same data many times, while MapReduce was designed for single pass jobs. Furthermore thesetypical uses cases likely involve hundreds to thousands of nodes with terabytes of data.
Further testing and analyzing Spark internal data could be done to determine if any bottlenecks exists which are limiting the performance of Spark. For example, how well is the cluster utilizing the hardware, namely the RAM?
# https://www.netjstech.com/2018/07/what-are-counters-in-hadoop-mapreduce.html
# https://www.educative.io/edpresso/mapreduce
# https://phoenixnap.com/kb/hadoop-vs-spark