Difference between revisions of "GPU621/Apache Spark"

From CDOT Wiki
Jump to: navigation, search
m (Conclusion)
(Applications)
 
(11 intermediate revisions by 2 users not shown)
Line 34: Line 34:
 
== Applications ==
 
== Applications ==
  
* In the healthcare sector for curing diseases, reducing medical costs, predicting and managing epidemics, and maintaining the quality of human life by keeping track of large-scale health index and metrics.  
+
* In the healthcare sector for reducing medical costs, predicting and managing epidemics, and maintaining the quality of human life by keeping track of large-scale health index and metrics.  
 
* In the financial industry to assess risk, build investment models, and create trading algorithms
 
* In the financial industry to assess risk, build investment models, and create trading algorithms
 
* In the retail industry to analyze structured and unstructured data to understand and serve their customers, fraud detection and prevention, inventory forecasting, etc.   
 
* In the retail industry to analyze structured and unstructured data to understand and serve their customers, fraud detection and prevention, inventory forecasting, etc.   
Line 44: Line 44:
 
== Architecture ==
 
== Architecture ==
 
[[File: Cluster-overview.png|thumb|upright=1.5|right|alt=Spark cluster|4.1 Spark Cluster components]]
 
[[File: Cluster-overview.png|thumb|upright=1.5|right|alt=Spark cluster|4.1 Spark Cluster components]]
One of the distinguishing features of Spark is that it processes data in RAM using a concept known as Resilient Distributed Datasets (RDDs) - an immutable distributed collection of objects which can contain any type of Python, Java, or Scala objects, including user-defined classes. Each dataset is divided into logical partitions which may be computed on different nodes of the cluster. Spark's RDDs function as a working set for distributed programs that offer a restricted form of distributed shared memory.  
+
One of the distinguishing features of Spark is that it processes data in RAM using a concept known as Resilient Distributed Datasets (RDDs) - an immutable distributed collection of objects which can contain any type of Python, Java, or Scala objects, including user-defined classes. Each dataset is divided into logical partitions which may be computed on different nodes of the cluster. Spark's RDDs function as a working set for distributed programs that offer a restricted form of distributed shared memory. Another important abstraction in Spark is Directed Acyclic Graph or DAG which is the scheduling layer that implements stage-oriented scheduling.  
  
  
Line 89: Line 89:
 
=== Compatibility ===
 
=== Compatibility ===
 
Spark can run as a standalone application or on top of Hadoop YARN or Apache Mesos. Spark supports data sources that implement Hadoop input format, so it can integrate with all the same data sources and file formats that Hadoop supports.  
 
Spark can run as a standalone application or on top of Hadoop YARN or Apache Mesos. Spark supports data sources that implement Hadoop input format, so it can integrate with all the same data sources and file formats that Hadoop supports.  
 
+
[[File:Hadoop-vs-spark.png|upright=2|right||300px]]
 
=== Data Processing ===
 
=== Data Processing ===
 
In addition to plain data processing, Spark can also process graphs, and it also has the MLlib machine learning library. Due to its high performance, Spark can do both real-time and batch processing. However, Hadoop MapReduce is great only for batch processing.  
 
In addition to plain data processing, Spark can also process graphs, and it also has the MLlib machine learning library. Due to its high performance, Spark can do both real-time and batch processing. However, Hadoop MapReduce is great only for batch processing.  
Line 106: Line 106:
  
 
[[File:Google-cloud-dataproc.png]]
 
[[File:Google-cloud-dataproc.png]]
 +
  
 
# We will use the Google Cloud Platform '''Dataproc''' to deploy a 6 virtual machine (VM) nodes (1 master, 5 workers) cluster that is automatically configured for both Hadoop and Spark.
 
# We will use the Google Cloud Platform '''Dataproc''' to deploy a 6 virtual machine (VM) nodes (1 master, 5 workers) cluster that is automatically configured for both Hadoop and Spark.
Line 111: Line 112:
 
# Store .jar and .py wordcount files and input data in the '''Cloud Storage Bucket'''
 
# Store .jar and .py wordcount files and input data in the '''Cloud Storage Bucket'''
 
# Run a '''Dataproc''' Hadoop MapReduce and Spark jobs to count number of words in large text files and compare the performance between Hadoop and Spark in execution time.
 
# Run a '''Dataproc''' Hadoop MapReduce and Spark jobs to count number of words in large text files and compare the performance between Hadoop and Spark in execution time.
 +
 +
=== What is Dataproc? ===
 +
 +
Dataproc is a managed Spark and Hadoop service that automates tasks for rapid cluster creation and management. Users can use cluster for larg-scale data processing with Spark or Hadoop with the same cluster. Virtual Machine (VM) nodes in the cluster create in minutes, with all node pre-configured and installed with Hadoop, Spark and other tools. Usage is charged by virtual CPU per hour, with standard and higher performance hardware configurations available at different rate.
  
 
=== Setting up Dataproc and Google Cloud Storage===
 
=== Setting up Dataproc and Google Cloud Storage===
Line 125: Line 130:
  
 
[[File:Googlecloud-setup-6b.jpg]]
 
[[File:Googlecloud-setup-6b.jpg]]
 +
  
 
'''We will create 5 worker nodes and 1 master node using the N1 series General-Purpose machine with 4vCPU and 15 GB memory and a disk size of 32-50 GB for all nodes.  
 
'''We will create 5 worker nodes and 1 master node using the N1 series General-Purpose machine with 4vCPU and 15 GB memory and a disk size of 32-50 GB for all nodes.  
 
You can see the cost of your machine configuration per hour. Using machines with more memory, computing power, etc will cost more per hourly use.'''
 
You can see the cost of your machine configuration per hour. Using machines with more memory, computing power, etc will cost more per hourly use.'''
  
[[File:Googlecloud-dataproc-1.jpg]]
+
 
 +
'''Create a cluster with 1 standard master node and 5 worker nodes'''
 +
* Name your cluster and choose a region and zone
 +
* Select a low-cost machine configuration
 +
** I.e. General Purpose N1 4vCPU, 15 GB memory for all nodes
 +
** 32 GB Standard Persistent Disk
 +
[[File:Googlecloud-setup-9.jpg]]
 +
 
  
 
'''Allow API access to all google Cloud services in the project.'''
 
'''Allow API access to all google Cloud services in the project.'''
  
[[File:Googlecloud-setup-9.jpg]]
+
[[File:Googlecloud-dataproc-1.jpg]]
  
  
'''To view the individual nodes in the cluster go to '''Menu -> Virtual Machines -> VM Instances'''
+
'''View the individual nodes in the cluster'''
 +
Go to '''Menu -> Virtual Machines -> VM Instances'''
  
 
[[File:Googlecloud-setup-11b.jpg]]
 
[[File:Googlecloud-setup-11b.jpg]]
Line 142: Line 156:
  
 
'''Ensure that Dataproc, Compute Engine, and Cloud Storage APIs are all enabled'''
 
'''Ensure that Dataproc, Compute Engine, and Cloud Storage APIs are all enabled'''
# Go to '''Menu -> API & Services -> Library.
+
# Menu -> API & Services -> Library.
# Search for the API name and enable them if they are not already enabled.'''
+
# Search for the API name and enable them if they are not already enabled.
  
  
'''Create a Cloud Storage Bucket by going from '''Menu -> Storage -> Browser -> Create Bucket''''''
+
'''Create a Cloud Storage Bucket'''<br>
Make a note of the bucket name.
+
# Menu -> Storage -> Browser -> Create Bucket<br>
 +
# Make a note of the bucket name.
  
  
Line 154: Line 169:
 
# To copy from the VM local disk to Cloud Storage bucket enter the following command in the shell:
 
# To copy from the VM local disk to Cloud Storage bucket enter the following command in the shell:
 
<Code> gsutil cp /usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar gs://<myBucketName>/ </Code>
 
<Code> gsutil cp /usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar gs://<myBucketName>/ </Code>
 +
 +
 +
'''WordCount.class from Hadoop MapReduce examples'''
 +
 +
// WordCountCount.class
 +
import java.io.IOException;
 +
import java.util.StringTokenizer;
 +
 +
import org.apache.hadoop.conf.Configuration;
 +
import org.apache.hadoop.fs.Path;
 +
import org.apache.hadoop.io.IntWritable;
 +
import org.apache.hadoop.io.Text;
 +
import org.apache.hadoop.mapreduce.Job;
 +
import org.apache.hadoop.mapreduce.Mapper;
 +
import org.apache.hadoop.mapreduce.Reducer;
 +
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 +
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 +
 +
public class WordCount {
 +
 +
  public static class TokenizerMapper
 +
      extends Mapper<Object, Text, Text, IntWritable>{
 +
 +
    private final static IntWritable one = new IntWritable(1);
 +
    private Text word = new Text();
 +
 +
    public void map(Object key, Text value, Context context
 +
                    ) throws IOException, InterruptedException {
 +
      StringTokenizer itr = new StringTokenizer(value.toString());
 +
      while (itr.hasMoreTokens()) {
 +
        word.set(itr.nextToken());
 +
        context.write(word, one);
 +
      }
 +
    }
 +
  }
 +
 +
  public static class IntSumReducer
 +
      extends Reducer<Text,IntWritable,Text,IntWritable> {
 +
    private IntWritable result = new IntWritable();
 +
 +
    public void reduce(Text key, Iterable<IntWritable> values,
 +
                      Context context
 +
                      ) throws IOException, InterruptedException {
 +
      int sum = 0;
 +
      for (IntWritable val : values) {
 +
        sum += val.get();
 +
      }
 +
      result.set(sum);
 +
      context.write(key, result);
 +
    }
 +
  }
 +
 +
  public static void main(String[] args) throws Exception {
 +
    Configuration conf = new Configuration();
 +
    Job job = Job.getInstance(conf, "word count");
 +
    job.setJarByClass(WordCount.class);
 +
    job.setMapperClass(TokenizerMapper.class);
 +
    job.setCombinerClass(IntSumReducer.class);
 +
    job.setReducerClass(IntSumReducer.class);
 +
    job.setOutputKeyClass(Text.class);
 +
    job.setOutputValueClass(IntWritable.class);
 +
    FileInputFormat.addInputPath(job, new Path(args[0]));
 +
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
 +
    System.exit(job.waitForCompletion(true) ? 0 : 1);
 +
  }
 +
}
  
  
Line 212: Line 293:
  
  
'''note: Running the job will create the output folder, However for subsequent jobs be sure to delete the output folder else Hadoop or Spark will not run. This limitation is done to prevent existing output from being overwritten'''
+
'''Note:''' Running the job will create the output folder.<br/>
 +
For subsequent jobs '''be sure to delete the output folder''' else Hadoop or Spark will not run.<br/>
 +
This limitation exists to prevent existing output from being overwritten
 +
 
  
 
[[File:Dataproc-hadoop.jpg]]
 
[[File:Dataproc-hadoop.jpg]]
Line 262: Line 346:
  
  
'''note: Running the job will create the output folder, However for subsequent jobs be sure to delete the output folder else Hadoop or Spark will not run. This limitation is done to prevent existing output from being overwritten'''
+
'''Note:''' Running the job will create the output folder.<br/>
 +
For subsequent jobs '''be sure to delete the output folder''' else Hadoop or Spark will not run.<br/>
 +
This limitation exists to prevent existing output from being overwritten
  
 
== Results ==
 
== Results ==
Line 287: Line 373:
 
== Conclusion ==
 
== Conclusion ==
  
 +
[[File:Googlecloud-hdfs.jpg|thumb|upright=2|right|alt=Spark cluster|5.1 Spark vs Hadoop MapReduce]]
 
Using the same hardware (RAM, CPUs, HDD) across a 6 node cluster and processing the same data (8 .txt files for total size of 7.77 GB) we can only see an approximately 12% performance improvement between Hadoop Mapreduce and Spark using a word count algorithm. This falls far short of the 10 times faster on disk and 100 times faster in-memory.  
 
Using the same hardware (RAM, CPUs, HDD) across a 6 node cluster and processing the same data (8 .txt files for total size of 7.77 GB) we can only see an approximately 12% performance improvement between Hadoop Mapreduce and Spark using a word count algorithm. This falls far short of the 10 times faster on disk and 100 times faster in-memory.  
 +
 +
Spark does require more memory than Hadoop to cache data in memory, however that should not be a limitation in this case as the worker nodes have 15 GB of memory and none of the input files exceed 2GB. This is more than enough space for Spark to store input data in memory in the resilient distributed datasets (RDDs). It is worth noting that Spark performs best when iterating over the same data many times, while MapReduce was designed for single pass jobs. Furthermore typical uses cases likely involve hundreds to thousands of nodes with terabytes of data.
  
 
Further testing and analyzing Spark internal data could be done to determine if any bottlenecks exists which are limiting the performance of Spark. For example, how well is the cluster utilizing the hardware, namely the RAM?  
 
Further testing and analyzing Spark internal data could be done to determine if any bottlenecks exists which are limiting the performance of Spark. For example, how well is the cluster utilizing the hardware, namely the RAM?  
  
 
One possible explanation is the use of Google Cloud Storage Bucket to store the data rather than in Hadoop Distributed File System (HDFS). Both jobs are operating directly on data in the Cloud Strage rather than the HDFS. This may be reducing data access time for Hadoop MapReduce or introducing data access time to Apache Spark, as opposed to having the input data stored directly on the VM data nodes.
 
One possible explanation is the use of Google Cloud Storage Bucket to store the data rather than in Hadoop Distributed File System (HDFS). Both jobs are operating directly on data in the Cloud Strage rather than the HDFS. This may be reducing data access time for Hadoop MapReduce or introducing data access time to Apache Spark, as opposed to having the input data stored directly on the VM data nodes.
 
[[File:Googlecloud-hdfs.jpg]]
 
  
 
== Progress ==
 
== Progress ==
Line 319: Line 406:
 
# https://www.netjstech.com/2018/07/what-are-counters-in-hadoop-mapreduce.html
 
# https://www.netjstech.com/2018/07/what-are-counters-in-hadoop-mapreduce.html
 
# https://www.educative.io/edpresso/mapreduce
 
# https://www.educative.io/edpresso/mapreduce
 +
# https://phoenixnap.com/kb/hadoop-vs-spark

Latest revision as of 17:12, 30 November 2020

Group Members

  1. Akhil Balachandran
  2. Daniel Park

Project Description

Apache Spark logo.svg.png vs Hadooplogo.png

MapReduce was famously used by Google to process massive data sets in parallel on a distributed cluster in order to index the web for accurate and efficient search results. Apache Hadoop, the open-source platform inspired by Google’s early proprietary technology has been one of the most popular big data processing frameworks. However, in recent years its usage has been declining in favor of other increasingly popular technologies, namely Apache Spark.

This project will focus on demonstrating how a particular use case performs in Apache Hadoop versus Apache spark, and how this relates to the rising and waning adoption of Spark and Hadoop respectively. It will compare the advantages of Apache Hadoop versus Apache Spark for certain big data applications.

Apache Hadoop

Apache Hadoop is an open-source framework that allows for the storage and distributed processing of large data sets across clusters of computers using simple programming models. Hadoop is an implementation of MapReduce, an application programming model developed by Google. MapReduce has three basic operations: Map, Shuffle and Reduce. Map, where each worker node applies a map function to the local data and writes the output to temporary storage. Shuffle, where worker nodes redistribute data based on output keys such that all data belonging to one key is located on the same worker node. Finally reduce, where each worker node processes each group of output in parallel.

Hadoop cluster
3.1 MapReduce

Architecture

Hadoop has a master-slave architecture as shown in figure 3.2. A small Hadoop cluster consists of a single master and multiple worker nodes. The master node consists of a Job Tracker, Task Tracker, NameNode, and DataNode. A worker node acts as both a task tracker and a DataNode. A file on HDFS is split into multiple blocks and each block is replicated within the Hadoop cluster. NameNode is the master server while the DataNodes store and maintain the blocks. The DataNodes are responsible for retrieving the blocks when requested by the NameNode. The DataNodes also perform block creation, deletion, and replication upon instruction from the NameNode.

Hadoop cluster
3.2 A multi-node Hadoop Cluster

Components

Hadoop Common

The set of common libraries and utilities that other modules depend on. It is also known as Hadoop Core as it provides support for all other Hadoop components.

Hadoop Distributed File System (HDFS)

This is the file system that manages the storage of large sets of data across a Hadoop cluster. HDFS can handle both structured and unstructured data. The storage hardware can range from any consumer-grade HDDs to enterprise drives.

Hadoop YARN

YARN (Yet Another Resource Negotiator) is responsible for managing computing resources and job scheduling.

Hadoop MapReduce

The processing component of Hadoop ecosystem. It assigns the data fragments from the HDFS to separate map tasks in the cluster and processes the chunks in parallel to combine the pieces into the desired result.


Applications

  • In the healthcare sector for reducing medical costs, predicting and managing epidemics, and maintaining the quality of human life by keeping track of large-scale health index and metrics.
  • In the financial industry to assess risk, build investment models, and create trading algorithms
  • In the retail industry to analyze structured and unstructured data to understand and serve their customers, fraud detection and prevention, inventory forecasting, etc.
  • In the telecom industry to store massive volumes of communications data which provides certain benefits like network traffic analytics, infrastructure planning, etc.

Apache Spark

Apache Spark is a unified analytics engine for large-scale data processing. It is an open-source, general-purpose cluster-computing framework that provides an interface for programming entire clusters with implicit data parallelism and fault tolerance. Since its inception, Spark has become one of the biggest big data distributed processing frameworks in the world. It can be deployed in a variety of ways, provides high-level APIs in Java, Scala, Python, and R programming languages, and supports SQL, streaming data, machine learning, and graph processing.

Architecture

Spark cluster
4.1 Spark Cluster components

One of the distinguishing features of Spark is that it processes data in RAM using a concept known as Resilient Distributed Datasets (RDDs) - an immutable distributed collection of objects which can contain any type of Python, Java, or Scala objects, including user-defined classes. Each dataset is divided into logical partitions which may be computed on different nodes of the cluster. Spark's RDDs function as a working set for distributed programs that offer a restricted form of distributed shared memory. Another important abstraction in Spark is Directed Acyclic Graph or DAG which is the scheduling layer that implements stage-oriented scheduling.


At a fundamental level, an Apache Spark application consists of two main components: a driver, which converts the user's code into multiple tasks that can be distributed across worker nodes, and executors, which run on those nodes and execute the tasks assigned to them. The processes are coordinated by the SparkContext object in the driver program. The SparkContext can connect to several types of cluster managers which allocate resources across applications. Once connected, Spark acquires executors on nodes in the cluster, which are processes that run computations and store data for the application. Next, it sends the application code to the executors and finally sends tasks to the executors to run.

Components

Spark cluster
4.2 Spark Stack

Spark Core

Spark Core is the basic building block of Spark, which includes all components for job scheduling, performing various memory operations, fault tolerance, task dispatching, basic input/output functionalities, etc.

Spark Streaming

Spart Streaming processes live streams of data. Data generated by various sources is processed at the very instant by Spark Streaming. Data can originate from different sources including Kafka, Kinesis, Flume, Twitter, ZeroMQ, TCP/IP sockets, etc.

Spark SQL

Spark SQL is a component on top of Spark Core that introduced a data abstraction called DataFrames, which provides support for structured and semi-structured data. Spark SQL allows querying data via SQL, as well as via Apache Hive's form of SQL called Hive Query Language (HQL). It also supports data from various sources like parse tables, log files, JSON, etc. Spark SQL allows programmers to combine SQL queries with programmable changes or manipulations supported by RDD in Python, Java, Scala, and R.

GraphX

GraphX is Spark's library for enhancing graphs and enabling graph-parallel computation. It is a distributed graph-processing framework built on top of Spark. Apache Spark includes a number of graph algorithms that help users in simplifying graph analytics.

MLlib (Machine Learning Library)

Spark MLlib is a distributed machine-learning framework on top of Spark Core. It provides various types of ML algorithms including regression, clustering, and classification, which can perform various operations on data to get meaningful insights out of it.

Applications

  • In the financial industry, to access and analyze social media profiles, call recordings, complaint logs, emails, forum discussions, etc. to gain insights which help them make the correct decisions for credit card assessment, targeted advertisement, etc.
  • In e-commerce, to provide targeted offers, enhance customer experience, and to optimize the overall performance. For example, e-commerce platforms like Alibaba and eBay utilize Spark to analyze massive amounts of customer data.
  • In the healthcare industry, to analyze patient records to predict the likelihood of facing health issues, perform genome sequencing, etc.
  • In the media and entertainment industry.
    • Yahoo uses Spark to personalize news webpages and for targeted advertising.
    • Netflix uses Spark for real-time stream processing to provide online recommendations to users.
    • Pinterest uses Spark to discover trends in user engagement data in order to react to developing trends in real-time.
  • In the travel industry. For example, TripAdvisor uses Spark to plan trips and provide personalized customer recommendations.

Comparison: Spark vs Hadoop MapReduce

Spark cluster
5.1 Spark vs Hadoop MapReduce

Performance

Spark processes data in RAM while Hadoop persists data back to the disk after a map or reduce action. Spark has been found to run 100 times faster in-memory, and 10 times faster on disk. Spark won the 2014 Gray Sort Benchmark where it sorted 100TB of data using 206 machines in 23 minutes beating a Hadoop MapReduce cluster's previous world record of 72 minutes using 2100 nodes.

Ease of Use

Spark is easier to program and includes an interactive mode. It has various pre-built APIs for Java, Scala, and Python. Hadoop MapReduce is harder to program but there are some tools available to make it easier.

Cost

According to benchmarks, Spark is more cost-effective as it requires less hardware to perform the same tasks faster.

Compatibility

Spark can run as a standalone application or on top of Hadoop YARN or Apache Mesos. Spark supports data sources that implement Hadoop input format, so it can integrate with all the same data sources and file formats that Hadoop supports.

Hadoop-vs-spark.png

Data Processing

In addition to plain data processing, Spark can also process graphs, and it also has the MLlib machine learning library. Due to its high performance, Spark can do both real-time and batch processing. However, Hadoop MapReduce is great only for batch processing.

Fault Tolerance

Both support retries per task and speculative execution. However, since Hadoop runs on disk, it is slightly more tolerant than Spark.

Security

Both Spark and Hadoop have access to support for Kerberos authentication, but Hadoop has more fine-grained security controls for HDFS.

Spark vs Hadoop Wordcount Performance

Methodology

Hadoop and Spark clusters can be deployed in cloud environments such as the Google Cloud Platform or Amazon EMR. The clusters are managed, scalable, and pay-per-usage and comparatively easier to setup and manage versus setting up a cluster locally on commodity hardware. We will use the Google Cloud Platform managed service to run experiments and observe possible expected performance differences between Hadoop and Spark.

Google-cloud-dataproc.png


  1. We will use the Google Cloud Platform Dataproc to deploy a 6 virtual machine (VM) nodes (1 master, 5 workers) cluster that is automatically configured for both Hadoop and Spark.
  2. Use Google Cloud Storage Connector which is compatible with Apache HDFS file system, instead of storing data on local disks of VMs.
  3. Store .jar and .py wordcount files and input data in the Cloud Storage Bucket
  4. Run a Dataproc Hadoop MapReduce and Spark jobs to count number of words in large text files and compare the performance between Hadoop and Spark in execution time.

What is Dataproc?

Dataproc is a managed Spark and Hadoop service that automates tasks for rapid cluster creation and management. Users can use cluster for larg-scale data processing with Spark or Hadoop with the same cluster. Virtual Machine (VM) nodes in the cluster create in minutes, with all node pre-configured and installed with Hadoop, Spark and other tools. Usage is charged by virtual CPU per hour, with standard and higher performance hardware configurations available at different rate.

Setting up Dataproc and Google Cloud Storage

Using a registered Google account navigate to the Google Cloud Console https://console.cloud.google.com/ and activate the free-trial credits. Googlecloud-setup-2.jpg

Create a new project by clicking the project link in the GCP console header. A default project of 'My First Project' is created by default

Once you are registered create the data cluster of master and slave nodes These nodes will come pre-configured with Apache Hadoop and Spark components.

Go to Menu -> Big Data -> DataProc -> Clusters

Googlecloud-setup-6b.jpg


We will create 5 worker nodes and 1 master node using the N1 series General-Purpose machine with 4vCPU and 15 GB memory and a disk size of 32-50 GB for all nodes. You can see the cost of your machine configuration per hour. Using machines with more memory, computing power, etc will cost more per hourly use.


Create a cluster with 1 standard master node and 5 worker nodes

  • Name your cluster and choose a region and zone
  • Select a low-cost machine configuration
    • I.e. General Purpose N1 4vCPU, 15 GB memory for all nodes
    • 32 GB Standard Persistent Disk

Googlecloud-setup-9.jpg


Allow API access to all google Cloud services in the project.

Googlecloud-dataproc-1.jpg


View the individual nodes in the cluster Go to Menu -> Virtual Machines -> VM Instances

Googlecloud-setup-11b.jpg


Ensure that Dataproc, Compute Engine, and Cloud Storage APIs are all enabled

  1. Menu -> API & Services -> Library.
  2. Search for the API name and enable them if they are not already enabled.


Create a Cloud Storage Bucket

  1. Menu -> Storage -> Browser -> Create Bucket
  2. Make a note of the bucket name.


Copy the Hadoop wordcount example available on every Dataproc cluster, from Master node VM to our Cloud Storage bucket

  1. Open Secure Shell (SSH) from VM Instances list: Menu -> Compute -> Compute Engine.
  2. To copy from the VM local disk to Cloud Storage bucket enter the following command in the shell:

gsutil cp /usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar gs://<myBucketName>/


WordCount.class from Hadoop MapReduce examples

// WordCountCount.class
import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

 public static class TokenizerMapper
      extends Mapper<Object, Text, Text, IntWritable>{

   private final static IntWritable one = new IntWritable(1);
   private Text word = new Text();

   public void map(Object key, Text value, Context context
                   ) throws IOException, InterruptedException {
     StringTokenizer itr = new StringTokenizer(value.toString());
     while (itr.hasMoreTokens()) {
       word.set(itr.nextToken());
       context.write(word, one);
     }
   }
 }

 public static class IntSumReducer
      extends Reducer<Text,IntWritable,Text,IntWritable> {
   private IntWritable result = new IntWritable();

   public void reduce(Text key, Iterable<IntWritable> values,
                      Context context
                      ) throws IOException, InterruptedException {
     int sum = 0;
     for (IntWritable val : values) {
       sum += val.get();
     }
     result.set(sum);
     context.write(key, result);
   }
 }

 public static void main(String[] args) throws Exception {
   Configuration conf = new Configuration();
   Job job = Job.getInstance(conf, "word count");
   job.setJarByClass(WordCount.class);
   job.setMapperClass(TokenizerMapper.class);
   job.setCombinerClass(IntSumReducer.class);
   job.setReducerClass(IntSumReducer.class);
   job.setOutputKeyClass(Text.class);
   job.setOutputValueClass(IntWritable.class);
   FileInputFormat.addInputPath(job, new Path(args[0]));
   FileOutputFormat.setOutputPath(job, new Path(args[1]));
   System.exit(job.waitForCompletion(true) ? 0 : 1);
 }
}


Save the Spark wordcount example into the Cloud Storage bucket by dragging and dropping it into the storage browswer

  1. To open Browser: Menu -> Storage -> Browser
  2. Drag and drop the below word-count.py into the browser, or use 'UPLOAD FILES' to upload.


# word-count.py
#!/usr/bin/env python
import pyspark
import sys
if len(sys.argv) != 3:
 raise Exception("Exactly 2 arguments are required: <inputUri> <outputUri>")
inputUri=sys.argv[1]
outputUri=sys.argv[2]
sc = pyspark.SparkContext()
lines = sc.textFile(sys.argv[1])
words = lines.flatMap(lambda line: line.split())
wordCounts = words.map(lambda word: (word, 1)).reduceByKey(lambda count1, count2: count1 + count2)
wordCounts.saveAsTextFile(sys.argv[2])


Finally, add the input files containing the text the word count jobs will be processing

  • Go to Cloud Storage Bucket: Menu -> Storage -> Browser
  • Create a new folder 'input' and open it
  • Drag and Drop input files, or use 'UPLOAD FILES' or 'UPLOAD FOLDER'

Misc-use-8.jpg


Input files for word count

For this analysis we are using archive text files of walkthroughs from https://gamefaqs.gamespot.com/ The files range in size from 4MB to 2.8GB for a total size of 7.7 GB of plain text.

Googlecloud-wordcountfiles.jpg


Running the Hadoop MapReduce Job in Dataproc

Now that we have our project code, input files and Dataproc cluster setup we can proceed to run the Hadoop MapReduce and Spark wordcount jobs.


Run the Hadoop MapReduce Job

  1. Go to Menu -> Big Data -> Dataproc -> Jobs
  2. Select 'SUBMIT JOB' and name your job ID
  3. Choose Region that the cluster was created on
  4. Select your cluster
  5. Specify Hadoop as Job Type
  6. Specify JAR which contains the Hadoop MapReduce algorithm, give 3 arguments to wordcount, and submit job.

mapreduce jar:

     gs://<myBucketName>/hadoop-mapreduce-examples.jar

3 arguments:

     wordcount gs://<myBucketName>/inputFolder gs://<myBucketName>output


Note: Running the job will create the output folder.
For subsequent jobs be sure to delete the output folder else Hadoop or Spark will not run.
This limitation exists to prevent existing output from being overwritten


Dataproc-hadoop.jpg


Retrieve the Results

You can observe the progress of each of the map and reduce jobs in the Job output console. When the jobs have completed and all the input files have been processed, Hadoop provides counters, statistics on the executed Job Also you can navigate back the the Jobs tab to see the total Elapsed Time of the job.


Some counters of note:

  1. number of splits: splits from all input data. A split is the amount of data in one map task (Hadoop default block size is 128 MB)
  2. Launched map tasks: number of total map tasks. Note that it matches the number of splits
  3. Launched reduce tasks: number of total reduce tasks
  4. GS: Number of bytes read: the total number of bytes read from Google Cloud Storage by both map and reduce tasks
  5. GS: Number of bytes written: the total number of bytes writtenfrom Google Cloud Storage by both map and reduce tasks
  6. Map input records: number of records (words) processed by all map tasks
  7. Reduce output records: number of records (word) output by all reduce tasks.
  8. CPU time spent (ms): total CPU processing time by all tasks

Dataproc-hadoop-2.jpeg


To output the files to a .txt file

  1. Open the SSH for the Master VM node: Menu -> Compute -> Compute Engine -> VM Instances -> SSH (of 'm' master node) -> Open in Browser Window
  2. Run following command in the shell to aggregate the results into 'output.txt' file
gsutil cat gs://rinsereduce/output/* > output.txt

You can then download the output file from the VM local storage to your local machine Press the dropdown from the Gear icon in the SSH and select Download File


Running the Apache Spark Wordcount Job in Dataproc

Create and Submit Dataproc Job

  1. Go to Menu -> Big Data -> Dataproc -> Jobs
  2. Select 'SUBMIT JOB' and name your job ID
  3. Choose Region that the cluster was created on
  4. Select your cluster
  5. Specify PySpark Job Type
  6. Specify .py file which contains the Apache Spark wordcount algorithm
  7. Give 2 arguments to word-count.py; the input folder and the output folder

word-count.py:

     gs://<myBucketName>/hadoop-mapreduce-examples.jar

2 arguments:

     gs://<myBucketName>/inputFolder gs://<myBucketName>output


Note: Running the job will create the output folder.
For subsequent jobs be sure to delete the output folder else Hadoop or Spark will not run.
This limitation exists to prevent existing output from being overwritten

Results

Hadoop Counters

  • Number of splits: 66
  • Total input files to process: 8
  • GS: Number of MB read: 8,291
  • GS: Number of MB written: 138
  • Launched map tasks: 66
  • Launched reduce tasks: 19
  • Map input records (millions): 191.1
  • Map output records (millions): 1,237.1
  • Reduce input records (millions): 9.3
  • Reduce output records (millions): 3.6
  • CPU time spent (s): 3,597


Results2.jpg


Results.jpg

Conclusion

Spark cluster
5.1 Spark vs Hadoop MapReduce

Using the same hardware (RAM, CPUs, HDD) across a 6 node cluster and processing the same data (8 .txt files for total size of 7.77 GB) we can only see an approximately 12% performance improvement between Hadoop Mapreduce and Spark using a word count algorithm. This falls far short of the 10 times faster on disk and 100 times faster in-memory.

Spark does require more memory than Hadoop to cache data in memory, however that should not be a limitation in this case as the worker nodes have 15 GB of memory and none of the input files exceed 2GB. This is more than enough space for Spark to store input data in memory in the resilient distributed datasets (RDDs). It is worth noting that Spark performs best when iterating over the same data many times, while MapReduce was designed for single pass jobs. Furthermore typical uses cases likely involve hundreds to thousands of nodes with terabytes of data.

Further testing and analyzing Spark internal data could be done to determine if any bottlenecks exists which are limiting the performance of Spark. For example, how well is the cluster utilizing the hardware, namely the RAM?

One possible explanation is the use of Google Cloud Storage Bucket to store the data rather than in Hadoop Distributed File System (HDFS). Both jobs are operating directly on data in the Cloud Strage rather than the HDFS. This may be reducing data access time for Hadoop MapReduce or introducing data access time to Apache Spark, as opposed to having the input data stored directly on the VM data nodes.

Progress

  1. Nov 9, 2020 - Added project description
  2. Nov 20, 2020 - Added outline and subsections
  3. Nov 21, 2020 - Added content about Apache Spark
  4. Nov 26, 2020 - Added content about Hadoop and Spark
  5. Nov 28, 2020 - Added setup step for Google Cloud account setup
  6. Nov 29, 2020 - Added Testing setup and job execution for Dataproc Hadoop and Spark
  7. Nov 30, 2020 - Added Testing results and conclusion.

References

  1. https://hadoop.apache.org/
  2. https://spark.apache.org/
  3. https://www.infoworld.com/article/3236869/what-is-apache-spark-the-big-data-platform-that-crushed-hadoop.html
  4. https://www.gigaspaces.com/blog/hadoop-vs-spark/
  5. https://logz.io/blog/hadoop-vs-spark
  6. https://en.wikipedia.org/wiki/Apache_Hadoop
  7. https://en.wikipedia.org/wiki/Apache_Spark
  8. https://www.dezyre.com/article/top-5-apache-spark-use-cases/271
  9. https://www.xplenty.com/blog/apache-spark-vs-hadoop-mapreduce
  10. https://cloud.google.com/dataproc/docs/tutorials/gcs-connector-spark-tutorial#python
  11. https://cloud.google.com/dataproc/docs/tutorials/gcs-connector-spark-tutorial#python
  12. https://www.netjstech.com/2018/07/what-are-counters-in-hadoop-mapreduce.html
  13. https://www.educative.io/edpresso/mapreduce
  14. https://phoenixnap.com/kb/hadoop-vs-spark