Team Hortons

From CDOT Wiki
Jump to: navigation, search

Once upon a time there was a team. In their programs, they never used floats. They only used double doubles.


Parallelism with Rust

Introduction: The Rust Programming language

Rust is a system programming language (like C and C++, offering direct access to physical memory) sponsored by Mozilla, created in 2010. It is compiled, imperative, functional and strongly typed; it aims to provide better memory safety and still maintain a performance similar to C++, while providing garbage collection through reference counting. Rust won the first place as the "most loved programming language" in 2016 and 2017, in a survey from Stack Overflow 1 2.

Rust has a very strong emphasis in security, more specifically, memory safety. Because of this, the same rules that the compiler requires from a program end up solving many problems that are common in concurrent executions, such as race conditions. This project will explain what these security rules are, how they can prevent problems in parallel programming, as well as demonstrating the correct way to tackle concurrency with Rust.

Memory Ownership and Borrowing

Rust has a very peculiar way to deal with memory deallocation, security, and ownership. Understanding how memory management works is essential in order to understand how to use parallel programming in Rust. Consider the following example, where we create a vector, push some elements into it, and then print the elements:

fn main() {
  let mut vec = Vec::new();
  vec.push(0);
  vec.push(1);
  vec.push(2);

  for i in vec.iter() {
    println!("{}", i);
  }
}

Output:

0
1
2

Notice how variables in Rust are immutable (constant) by default - we need to manually specify that the vector can be changed. Failing to do so will result in a compilation error.

In the next example, we abstracted the "push" method to a function called "add_to_vec", which takes a vector and a number, and pushes the number into the vector:

fn add_to_vec(mut v: Vec<i32>, n: i32) {
  v.push(n);
}

fn main() {
  let mut vec = Vec::new();
  vec.push(0);
  vec.push(1);
  vec.push(2);

  add_to_vec(vec, 3);

  for i in vec.iter() {
    println!("{}", i);
  }
}

In a language like C++, a vector passed like this would be copied to the function and modified, leaving the original vector untouched; in a language like Java, a similar logic would work, since objects are passed by reference by default. Rust, on the other hand, has a different approach: sending the vector to a function like this means transferring the ownership of the object completely to that function. This means that after the vector is sent to a function, it is not available to the main function anymore - once the add_to_vec function ends, the vector will then be deallocated, and the next *for* loop will fail.

If we try compiling this code, the compiler will produce an error:

error[E0382]: use of moved value: `vec`
  --> src/main.rs:13:12
   |
11 |   add_to_vec(vec, 3);
   |              --- value moved here
12 | 
13 |   for i in vec.iter() {
   |            ^^^ value used here after move
   |
   = note: move occurs because `vec` has type `std::vec::Vec<i32>`, which does not implement the `Copy` trait

To fix this, we will have to transfer the ownership of the vector back to the main function. We do this by returning the vector back and reassigning it to the variable vec:

fn add_to_vec(mut v: Vec<i32>, n: i32) -> Vec<i32> {
  v.push(n);
  v // shorthand for returning. notice the lack of ;
}

fn main() {
  let mut vec = Vec::new();
  vec.push(0);
  vec.push(1);
  vec.push(2);

  vec = add_to_vec(vec, 3); // receiving the ownership back

  for i in vec.iter() {
    println!("{}", i);
  }
}

Now the output is the expected:

0
1
2
3

An easier way to deal with memory ownership is by lending/borrowing memory instead of completely transferring control over it. This is called memory borrowing. To borrow a value, we use the & operator:

fn add_to_vec(v: &mut Vec<i32>, n: i32) {
  v.push(n);
}

fn main() {
  let mut vec = Vec::new();
  vec.push(0);
  vec.push(1);
  vec.push(2);

  add_to_vec(&mut vec, 3);

  for i in vec.iter() {
    println!("{}", i);
  }
}

This will also produce the correct output. In this case, we don't need to send the ownership of the vector back, because it was never transferred, only borrowed. There is one important detail: mutable references (&mut T) are only allowed to exist if no other mutable references to that value are active. In other words: only one mutable reference to a value may exist at a time.

From what was presented above, some problems can be identified when applying parallelism:

  • If memory ownership is transferred in function calls, we can not transfer the ownership of one something to several threads at the same time - only one thread can have the ownership
  • Mutable references to something can only exist one at a time, this means that we can not spawn several threads with mutable references to a value

These two factors limit the capability of conventional shared memory. For example, the following code would not compile:

use std::thread;

fn add_to_vec(v: &mut Vec<i32>, n: i32) {
  v.push(n);
}

fn main() {
  let mut vec = Vec::new();
  vec.push(0);
  vec.push(1);
  vec.push(2);

  let t1 = thread::spawn(move || {
    add_to_vec(&mut vec, 4);
  });

  add_to_vec(&mut vec, 3);

  for i in vec.iter() {
    println!("{}", i);
  }

  t1.join();
}
error[E0382]: use of moved value: `vec`
  --> src/main.rs:20:19
   |
13 |   let t1 = thread::spawn(move || {
   |                          ------- value moved (into closure) here
...
20 |   add_to_vec(&mut vec, 3);
   |                   ^^^ value used here after move
   |

Although this seems unnecessarily strict, it has a great advantage: race conditions are impossible - they are checked at compile time, which means that the systems will be safer and less likely to present concurrency problems. To correctly use parallel programming in Rust, we can make use of tools such as Channels, Locks (Mutex), and Atomic pointers.

Channels

Instead of sharing memory, Rust encourages communication between thread in order to combine information. Channels, which are available in Rust's standard library, can be pictured as rivers: an object put into this river will be carried downstream. Likewise, a Channel has two parts: a sender and a receiver. The sender is responsible for putting information in the channel, which can be retrieved by the receiver. When we ask for a receiver to get a message from the channel, the execution may wait until the message is received or not: the method recv will block the execution, while try_recv will not.

The following example will spawn a thread, which will send a few strings down a channel which will be received by the main thread.

use std::thread;
use std::sync::mpsc;
use std::sync::mpsc::{Sender, Receiver};

fn main() {
  let (sender, receiver): (Sender<String>, Receiver<String>) = 
    mpsc::channel();

  thread::spawn(move || {
    let messages = vec![
      String::from("blue"),
      String::from("yellow"),
      String::from("green"),
      String::from("red"),
    ];

    for colour in messages {
      sender.send(colour).unwrap();
    }
  });

  for message in receiver {
    println!("Got {}", message);
  }

}

This produces the following output:

Got blue
Got yellow
Got green
Got red

Similarly to the previous examples, the sender will be moved to the closure executed by the thread, which means that it will go out of scope when the thread finishes. To distribute the sender through several threads, they can be cloned: objects sent through this sender will still be received by the same receiver, but the two senders are different objects, which can have their ownership moved:

use std::thread;
use std::sync::mpsc;
use std::sync::mpsc::{Sender, Receiver};

static NUM_THREADS: i32 = 8;

fn main() {
  let mut vec = Vec::new();

  let (sender, receiver): (Sender<i32>, Receiver<i32>) = 
    mpsc::channel();

  for i in 0..NUM_THREADS {
    // cloning the sender so it can be exclusive to
    // the new thread
    let thread_sender = sender.clone();

    thread::spawn(move || {
      thread_sender.send(i).unwrap();
    });
  }

  for _ in 0..NUM_THREADS {
    vec.push(receiver.recv().unwrap());
  }

  for i in vec.iter() {
    println!("{}", i);
  }

}

Output:

1
3
0
2
5
6
7
4

Locks (Mutex)

Rust language also supports shared-state concurrency which allows two or more processes have some shared state (data) between them they can write to and read from. Sharing data between multiple threads can get pretty complicated since it introduces the strong hazard of race conditions. Imagine a situation when one thread grabs some data and attempts to change it, while another thread is starting to read the same value, there’s no way to predict if latter one retrieves updated data or if it gets hold of the old value. Thus shared-state concurrency has to deliver some implementation of guard and synchronized access mechanism.

Mutex
Acquire/Release Lock Process

The access to shared state (critical section) and synchronization methods are typically implemented through ‘mutex’. Mutual exclusion principles (mutex) provide ways to prevent race conditions by only allowing one thread to reach out some data at any given time. If one thread wants to read/write some piece of data, it must first give a signal and then, once permitted, acquire the mutex’s lock. The lock is a special data structure that can keep track of who currently has exclusive access to the data. You can think of locks as dedicated mechanism that guards critical section.

Since Rust Lang supports ownership principles, the threads are inaccessible to each other automatically and only one thread can access data at any given time time.

The snippet below demonstrates how you can create and access mutex in rustlang:

fn use_lock(mutex: &Mutex<Vec<i32>>) {
    let mut guard = lock(mutex); // take ownership
    let numbers = access(&mut guard);   // borrow access 
    numbers.push(42);  // change the data
}

Mutex construct is generic and it accepts the piece of shared data protected by the lock. It is important to remember that the ownership of that data is transferred into the mutex structure at its creation.

The lock function will attempt to acquire the lock by blocking the local thread until it is available to obtain the mutex. The data is automatically unlocked once the return value of lock() function gets released (at the end of the function scope) so there is no need to release the mutex lock manually.

RC and Atomic

In Rust, some data types are defined as “thread safe” while others are not. For example, Rc<T> type, which is Rust’s own implementation of “smart pointer”, is considered to be unsafe to share across threads. This type keeps track of number of references and increments/decrements count each time a new reference is created or old one gets destroyed. Rc<T> does not enforce any mechanisms that makes sure that changes to the reference counter value can’t be interrupted by another thread. The snippet below demonstrates the issue an results in compiler error:

let mutex = Rc::new(Mutex::new(2));
let mut handles = vec![];

for _ in 0..4 {
    let mutex = mutex.clone();
    let handle = thread::spawn(move || {
        let mut num = mutex.lock().unwrap();

        *num *= 2;
        println!("Intermediate Result : {}", *num); 
    });
    handles.push(handle);
}

for handle in handles {
    handle.join().unwrap();
}

println!("Final Result: {}", *mutex.lock().unwrap());

This will produce an error :

error[E0277]: the trait bound `std::rc::Rc<std::sync::Mutex<i32>>: std::marker::Send` is not satisfied in `[closure@src/main.rs:11:32: 16:6 mutex:std::rc::Rc<std::sync::Mutex<i32>>]`
  --> src/main.rs:11:18
   |
11 |     let handle = thread::spawn(move || {
   |                  ^^^^^^^^^^^^^ `std::rc::Rc<std::sync::Mutex<i32>>` cannot be sent between threads safely
   |
   = help: within `[closure@src/main.rs:11:32: 16:6 mutex:std::rc::Rc<std::sync::Mutex<i32>>]`, the trait `std::marker::Send` is not implemented for `std::rc::Rc<std::sync::Mutex<i32>>`
   = note: required because it appears within the type `[closure@src/main.rs:11:32: 16:6 mutex:std::rc::Rc<std::sync::Mutex<i32>>]`
   = note: required by `std::thread::spawn`

Luckily for us, Rust ships another thread-safe implementation of ‘smart pointer’ called Arc<T> which implements reference counting via atomic operations. It is important to note that in serial code, it makes much more sense to use standard Rc<T> over Arc<T> type since the latter structure is more expensive performance-wise.

use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
let mutex = Arc::new(Mutex::new(2));
let mut handles = vec![];

for _ in 0..4 {
    let mutex = mutex.clone();
    let handle = thread::spawn(move || {
        let mut num = mutex.lock().unwrap();

        *num *= 2;
        println!("Intermediate Result : {}", *num); 
    });
    handles.push(handle);
}

for handle in handles {
    handle.join().unwrap();
 }

println!("Final Result: {}", *mutex.lock().unwrap());

}

This will produce the expected result :

Intermediate Result : 4
Intermediate Result : 8
Intermediate Result : 16
Intermediate Result : 32
Final Result: 32

This time the code worked. This example was very simple and not too impressive. Much more complicated algorithms can be implemented with the Mutex<T>.

Rayon Library

Introduction

Rust lang users and community are heavily involved in development of language extensions. Although core features offer quite a bit of programming tools on their own, there is still a demand for more complex functionality with a higher-level API. Rust has its own package registry that stores various libraries that can satisfy programmers needs. These packages vary from networking tools, web services to image processing and multi-threading libraries. One package can be of particular interest to parallel programming enthusiasts – it is a data-parallelism library called Rayon. The package is praised to be extremely lightweight and it makes it easy to convert a sequential computation into a parallel one. It also abstracts some Rust-lang threading boilerplate code and makes coding parallel tasks a bit more fun. Just like TBB, it can be used with iterators, where iterator chains are executed in parallel. It also supports join model and provides ways to convert recursive, divide-and-conquer style problems into parallel implementations.

Parallel Iterators

Rayon offers experimental API called “parallel iterators”. Example below demonstrates parallel sum of squares function:

use rayon::prelude::*;
fn sum_of_squares(input: &[i32]) -> i32 {
    input.par_iter()
         .map(|&i| i * i)
         .sum()
}

Using Join for Divide-and-Conquer Problems

Parallel iterators are implemented with a more primitive method called join which takes 2 closures and runs them in parallel. The code snippet below demonstrates increment algorithm:

fn increment_all(slice: &mut [i32]) {
    if slice.len() < 1000 {
        for p in slice { *p += 1; }
    } else {
        let mid_point = slice.len() / 2;
        let (left, right) = slice.split_at_mut(mid_point);
        rayon::join(|| increment_all(left), || increment_all(right));
    }
}

How it works

Rayon borrowed work stealing concepts from Cilk project. The library attempts to dynamically ascertain how much multi-threading resources are available. The idea is very simple: there is always a pool of worker threads available, waiting for some work to do. When you call join the first time, we shift over into that pool of threads.

Rust Join1.png

But if you call join(a, b) from a worker thread W, then W will place b into its work queue, advertising that this is work that other worker threads might help out with. W will then start executing a. While W is busy with a, other threads might come along and take b from its queue. That is called stealing b.

Rust Join 2.png

Once a is done, W checks whether b was stolen by another thread and, if not, executes b itself. If W runs out of jobs in its own queue, it will look through the other threads' queues and try to steal work from them.

Rust Join 3.png

Group Members

  1. Henrique Salvadori Coelho
  2. Olga Belavina

Sources

Progress

-5%