GPU621/Spark
Contents
Progress
Oct 18th
Group page created and presentation schedule has been submitted. Specific topics covering Apache Spark are TBD and will be outlined here in the next few days. Most likely will be a tutorial with example implementation.
Nov 13th
Presentation will be an introduction to the Spark technology with a short introduction to the main features of the framework as well as a quick tutorial on deploying a spark standalone instance
Nov 21st Presenatation was given today. Presentation is available http://gpu621.nickscherman.com/. Presentation notes will be up shortly.
'Nov 24th Notes have been added. Introduction to Apache Spark and an elaboration of the presentation contents.
Reading
This is a supplementary to the presentation for those who want an in depth walk through of the concepts and code. The presentation focused on an introduction to the technical and practical aspects of Spark and these notes will focus on the same.
SPARK
Spark is Big Data framework for large scale data procesing. It provides an API centred on a data structure called the Resilient Distributed Dataset (RDD). It provides a read only, fault tolerant multiset of data items distributed over a cluster of machines. High-level APIs are available for Scala, Java, Python, and R. This tutorial focuses on Python code for its simplicity and popularity.
HISTORY
Spark was developed in 2009 at UC Berkeleys AMPLab. It was open sourced in 2010 under the BSD license. As of this writing (November 2016), it's at version 2.02.
Spark is one of the most active projects in the Apache Software foundation and one of the most popular open source big data projects overall. Spark had over 1000 contributors in 2015.
Version | Original release date | Latest Version | Release data |
0.5 | 2012-06-12 | 0.5.1 | 2012-10-07 |
0.6 | 2012-10-14 | 0.6.1 | 2012-11-16 |
0.7 | 2013-02-27 | 0.7.3 | 2013-07-16 |
0.8 | 2013-09-25 | 0.8.1 | 2013-12-19 |
0.9 | 2014-02-02 | 0.9.2 | 2014-07-23 |
1.0 | 2014-05-30 | 1.0.2 | 2014-08-05 |
1.1 | 2014-09-11 | 1.1.1 | 2014-11-26 |
1.2 | 2014-12-18 | 1.2.2 | 2015-04-17 |
1.3 | 2015-03-13 | 1.3.1 | 2015-04-17 |
1.4 | 2015-06-11 | 1.4.1 | 2015-07-15 |
1.5 | 2015-09-09 | 1.5.2 | 2015-11-09 |
1.6 | 2016-01-04 | 1.6.3 | 2016-11-07 |
2.0 | 2016-07-26 | 2.0.2 | 2016-11-14 |
Components
Spark applications run as independent sets of processes on a cluster, coordinated by the SparkContext object in your main program (called the driver program).
The SparkContext can connect to several types of cluster managers (either Spark’s own standalone cluster manager, Mesos or YARN), which allocate resources across applications. Once connected, Spark acquires executors on nodes in the cluster, which are processes that run computations and store data for your application. Next, it sends your application code (defined by JAR or Python files passed to SparkContext) to the executors. Finally, SparkContext sends tasks to the executors to run.
- Each application gets its own executor processes, which stay up for the duration of the whole application and run tasks in multiple threads. benefit of isolating applications from each other, on both the scheduling side (each driver schedules its own tasks) and executor side (tasks from different applications run in different JVMs).
- Data cannot be shared across different Spark applications (instances of SparkContext) without writing it to an external storage system.
- Spark is agnostic to the underlying cluster manager. As long as it can acquire executor processes, and these communicate with each other. This means you can use a variety of different cluster managers to accomplish your parallelization task.
- The driver program must listen for and accept incoming connections from its executors throughout its lifetime (e.g., see spark.driver.port in the network config section). As such, the driver program must be network addressable from the worker nodes. Because the driver schedules tasks on the cluster,
Resilient Distributed Dataset
RDD is the primary data abstraction in Spark. A three letter acronym, let's go over what each letter means.
R (Reslient)
Fault tolerance. The ability to recompute missing or damaged partitions, mainly through what's called a lineage graph A RDD Lineage Graph (aka RDD operator graph) is a graph of all the parent RDDs of a RDD. It is built as a result of applying transformations to the RDD and creates a logical execution plan. The following RDD graph shows the result of a the following transformations. an Lineage graph is generated as a result of an ``Action.`` Actions are an essential property of Spark that we will cover shortly.
val r00 = sc.parallelize(0 to 9)
val r01 = sc.parallelize(0 to 90 by 10)
val r10 = r00 cartesian r01
val r11 = r00.map(n => (n, n))
val r12 = r00 zip r01
val r13 = r01.keyBy(_ / 20)
val r20 = Seq(r11, r12, r13).foldLeft(r10)(_ union _)
D (Distributed)
Describes how data resides on multiple nodes in a cluster across a network of machines. can be read from and written to distributed storages like HDFS or S3, and most importantly, can be cached in the memory of worker nodes for immediate reuse. Spark is designed as a framework that operates over a network infrastructure, so tasks are divided and executed across multiple nodes in a Spark Context.
D (Dataset)
The RDD dataset is a collection of automatically partitioned data. Spark manages data using partitions that helps parallelize distributed data processing with minimal network traffic for sending data between executors. By default, Spark tries to read data into an RDD from the nodes that are close to it. Since Spark usually accesses distributed partitioned data, to optimize transformation operations it creates partitions to hold the data chunks. Source
RDD Essential Properties
RDD has 3 essential properties and 2 Optional properties.
1. List of parent RDDs that is the list of the dependencies an RDD depends on for records. 2. An array of partitions that a dataset is divided to. 3. A compute function to do a computation on partitions. 4. An optional partitioner that defines how keys are hashed, and the pairs partitioned (for key-value RDDs) 5. Optional preferred locations (aka locality info), i.e. hosts for a partition where the data will have been loaded.
RDD Functions
RDD supports two kinds of operations, Actions, and Transformation.
- A transformation is a lazy operation on a RDD that returns another RDD.
- Actions are operations that trigger computation and return values to the spark drivers.
The essential idea is that the programmer specifies an operation, a transformation or series of transformations to perform on a data set using the specified operation(s), and finally, perform an action that returns new data that is a result of the transformations. The new data that resulted from the action can then be used for analysis, or for further transformations. Transformations can be thought of as the start of a parallel region of code, and the action as the end of the parallel region. Everything in Spark is designed to be as simple as possible, so partitions, threads etc. are generated automatically.
Advantages
The main advantage of Spark is that the data partitions are stored in memory, meaning that access to information is much faster than if the data was retrieved from a hard disk, this is also is a disadvantage in some cases, as storing large datasets in memory also necessitates the need for a large amount of physical memory.
The other main advantage of Spark is its ease of use. The Spark API is available in several popular programming languages and its design and syntax is geared towards those who have minimal experience in complex programming tasks and parallelization. Spark also features in interactive mode for each language that allows developers to experiment with different transformations etc. in the Spark shell.
Apache Spark | Apache Hadoop | ||
Cores | Memory | Disks | Network |
8-16 | 8-100GB | 4-8 1TB | 10GB/s |
4 | 24GB | 4-6 1TB | 1GB/s |
The most often drawn comparison is between Spark and Hadoop MapReduce. Here we can see a comparison. The most notable thing is that spark requires huge amounts of memory to support holding its data partitions in memory whereas Hadoop requires far less since it relies on its own distributed file system to persist the data. Spark is also up to 100x faster than Hadoop at performing operations while mainting the ability to use Hadoop's excellent HDFS file system as well as other data storage frameworkds like Amazon Web Services s3.
Logistic Regresion Performance Comparison
Programming Examples/Walkthrough
We'll now cover the following:
- Installation
- Calculate PI Example
- Word Count Example
- Network Context
Programming examples based on the included Spark examples and are available from my git repository here.
Installation
Spark is available for most UNIX (and OSX) platforms as well as Windows. Windows installation is more difficult since it usually requires building from source. This guide will simply cover installing Spark on Linux. If you want to follow along, you can install this on your local Linux laptop or desktop, or use Seneca Matrix to install as the binaries can be executed in your home directory.
You can easily get the prebuilt Spark binaries from the Apache website here. Download the spark binaries and unzip it
tar -xvf spark-2.0.2-bin-hadoop2.7.tgz
Unzipping the binaries
Below is a diagram of the unzipped directory structure:
- bin - The spark binaries used for executing spark process and running the shells for various programming languages (eg. Pyspark for the python shell).
- Conf - Various templated configurations used for impementing clustering and scheduling, Creating Docker containers etc.
- Data - Data used in the numerous provided examples.
- jars - Java executables to include in your spark project.
- licenses - IP licenses for the various spark components and those libraries used in the programming examples
- python/R - Libraries and binaries for language specific instances
- sbin - Shell scripts for daemonizing spark instance and attaching to the various cluster managers (Mesos, etc).
- examples - Numerous programming examples in the supported languages
- yarn - Java jar for Yarn scheduler.
spark-2.0.2-bin-hadoop2.7
├── bin
│ ├── ...
├── conf
│ ├── ...
├── data
│ ├── ...
├── examples
│ ├── ...RandomForestClassificationExample.scala
├── jars
│ ├── ...
├── LICENSE
├── licenses
│ ├── ...
├── NOTICE
├── python
│ ├── docs
│ │ ├── ...
├── R
│ └── lib
│ ├── SparkR
│ │ ├── ...
├── README.md
├── RELEASE
├── sbin
│ ├── ...
└── yarn
└── ...
We will simply focus on the most basic aspect of running spark, namely submitting a job to a new Spark scheduler using the default settings.
PI Example
The example below calculates PI.
from __future__ import print_function
import sys
from random import random
from operator import add
from pyspark.sql import SparkSession
spark = SparkSession\
.builder\
.appName("PythonPi")\
.getOrCreate()
partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2
n = 100000 * partitions
def f(_):##
x = random() * 2 - 1
y = random() * 2 - 1
return 1 if x ** 2 + y ** 2 <= 1 else 0
count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
spark.stop()
print('\n\n===============\n')
print("Pi is roughly %f" % (4.0 * count / n))
print('\n\n===============\n')
We can see that SparkSession... creates a new SparkSession with the name of "PythonPi." Partitions are specified through a command line argument. the funtion f() specifies the operation that approximates pi. The "f" operation is passed to the "map" transformation and the result "count" is returned as the result of the "reduce" action. Finally, the spark instance is stopped before an operation is done on the returned data, in this case, simply printing the approximate result.
Word Count
Below is a simple word count example. Similar to the GPU621 workshop 5 using TBB. Compared to the TBB example, you might notice that it is much shorter. Though not simplistic, Spark tries to minimize the amount of code required to parallelize operations and this is evident in this example. Step by step walkthrouh is beneath the code:
from __future__ import print_function
import sys
from operator import add
from pyspark.sql import SparkSession
if len(sys.argv) != 2:
print("Usage: wordcount <file>", file=sys.stderr)
exit(-1)
spark = SparkSession\
.builder\
.appName("PythonWordCount")\
.getOrCreate()
lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])
counts = lines.flatMap(lambda x: x.split(' ')) \
.map(lambda x: (x, 1)) \
.reduceByKey(add)
output = counts.collect()
longest_word_length = [0, '']
longestCount = [0, '']
for (word, count) in output:
if(longest_word_length[0] < len(word)):
longest_word_length = [len(word), word]
if(longestCount[0] < count):
longestCount = [count, word]
print("%s: %i" % (word, count))
spark.stop()
print("\n\n==============\n")
print("Longest word: \"%s\"" % (longest_word_length[1]))
print("Most numerous word: \"%s\" %i" % (longestCount[1], longestCount[0]))
print("\n\n==============\n")
The file that the operation is performed upon is passed in as an argument as a local file, though It could be a network accessible resource (eg. available through HDFS). The file is loaded using the built in spark.read.text function to read a text file. The flatMap transformation is performed to split each line. Next each word is transformed with a map and a reduction transformation counts each repetition of a word. Finally the "output" variable is created using the collect() action. The longest word and count are calculated by iterating through the words in serial using vanilla python. Finally the spark session is ended with spark.stop() and the results are printed.
Clustering
Clustering is beyond the scope of this introduction, from a programming perspective, it requires starting a new Spark Session with a configuration that references a remote Manager. the remote manager will handle clustering and assigning the partitions to executors on different nodes automatically.
Connecting to a remote session might look like this. The master (manager) is specified as a command line flag
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://207.184.161.138:7077 \
--deploy-mode cluster \
--supervise \
--executor-memory 20G \
--total-executor-cores 100 \
/path/to/examples.jar \
1000