Fearless concurrent

Concurrency generally refers to:

  • ConcurrentIndependent execution between different parts of a program
  • ParallelDifferent parts of the program run simultaneously

Rust concurrency: Allows you to write code that is bug-free and does not make it easy to refactor if new bugs are introduced.

Processes and threads

In most OSS, code runs in processes, and the OS manages multiple processes simultaneously.

In your program, separate parts can run at the same time, and those separate parts are run by threads.

Although multithreading can improve performance, it also increases complexity and cannot guarantee the execution order of each thread.

Multithreading can cause problems:

  • A race state in which threads access data or resources in an inconsistent order
  • A deadlock occurs when two threads wait for each other to run out of resources and the thread cannot continue
  • Bugs that only occur under certain circumstances are difficult to reliably replicate and fix

How to implement threads:

  • Threads are created by calling OS apis: 1:1 model, requiring a smaller runtime
  • The language’s own implementation of threads (green threads) : M:N model, requires a larger runtime

Rust needs to trade off runtime support, and Rust’s standard library only provides threads in a 1:1 model.

Thread ::spawn Creates the thread

Directly on the code:

use std::{thread, time::Duration};

fn main() {
    thread::spawn(|| { / / closures
        for i in 1.10 {
            println!("number {} from the spawned thread!", i);
            thread::sleep(Duration::from_millis(1)); }});for i in 1.5 {
        println!("number {} from the main thread!", i);
        thread::sleep(Duration::from_millis(1)); }}Copy the code

The closure content in this code is never finished, because the main program is finished, and the output may not be the same each time.

The thread::spawn function returns a JoinHandle. To resolve this problem, wait for all threads to complete in the JoinHandle.

The JoinHandle holds ownership of the value, and calling its Join method blocks execution of the current thread until the threads represented by the Handle terminate, thus waiting for the corresponding threads to complete.

use std::{thread, time::Duration};

fn main() {
    let handle = thread::spawn(|| {
        for i in 1.10 {
            println!("number {} from the spawned thread!", i);
            thread::sleep(Duration::from_millis(1)); }});for i in 1.5 {
        println!("number {} from the main thread!", i);
        thread::sleep(Duration::from_millis(1));
    }

    handle.join().unwrap();
}
Copy the code

At this point the thread can finish executing.

Use a Move closure

The move closure is usually used with the Thread ::spawn function, which allows you to use data from other threads. When a thread is created, ownership of a value is transferred from one thread to another.

use std::thread;

fn main() {
    let v = vec![1.2.3];

    let handle = thread::spawn(move| | {println!("Here's a vector: {:? }", v);
    });

    handle.join().unwrap();
}
Copy the code

If you don’t add move, this code will fail because the lifetime of the V may be shorter than that of the closure and will be dropped before the closure can point to it.

The messaging

Messaging is a popular and secure concurrency technique. Threads communicate by sending messages to each other.

Channel

A Channel is provided by the standard library and is divided into a sender and a receiver. The sender’s method is called to send data, and the receiver checks and receives incoming data. If either end is discarded, the Channel is “closed”.

create

Create a channel using the MPSC ::channel function, which stands for multiple producer, single consumer. Calling this function returns a tuple, with the first element being the sender and the second element being the receiver.

use std::{thread, sync::mpsc};

fn main() {
    let (tx, rx) = mpsc::channel();
    
    thread::spawn(move| | {let val = String::from("hi");
        tx.send(val).unwrap(); // The ownership of val will be transferred
    });

    let received = rx.recv().unwrap();

    println!("Got: {}", received);
}
Copy the code

The send method on the sender, which takes the data you want to send, returns Result

, or an error if there is a problem (for example, the receiver has been dropped).
,>

Methods at the receiving end:

  • recvMethod: Block the current thread from executing untilChannelThe median is sent in and returned as soon as any value is receivedResult<T, E>When the sender is closed, an error is received.
  • try_recvMethod: Does not block and returns immediatelyResult<T, E>Return as data arrivesOk, which also contains data. Otherwise, an error is returned. A circular call is usually used to checktry_recvResults.

Channel and transfer of ownership

Ownership is very important in messaging: it helps you write secure, concurrent code.

The send method in the above example has the transfer ownership operation.

Send multiple values and see that the recipient is waiting

use std::{thread, sync::mpsc, time::Duration};

fn main() {
    let (tx, rx) = mpsc::channel();
    
    thread::spawn(move| | {let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread")];for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_millis(1000)); }});for received in rx {
        println!("Got: {}", received); }}Copy the code

Create multiple senders by cloning

use std::{thread, sync::mpsc, time::Duration};

fn main() {
    let (tx, rx) = mpsc::channel();
    let tx1 = mpsc::Sender::clone(&tx);
    
    thread::spawn(move| | {let vals = vec![
            String::from("1:hi"),
            String::from("1:from"),
            String::from("1:the"),
            String::from("1:thread")];for val in vals {
            tx1.send(val).unwrap();
            thread::sleep(Duration::from_millis(1000)); }}); thread::spawn(move| | {let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread")];for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_millis(1000)); }});for received in rx {
        println!("Got: {}", received); }}Copy the code

Concurrency in shared state

Whereas earlier messaging shared memory through communication, Rust also supports concurrency through shared state.

A Channel is like single ownership in that once the ownership of a value is moved to a Channel, it is no longer usable, whereas shared memory is like multiple ownership in that multiple threads can access the same memory block at the same time.

Mutex

Mutex stands for “Mutual Exclusion.” Mutex allows only one thread to access some data at a time. To access the data, the thread must first acquire a Mutex lock.

The Lock data structure is part of MUtex and keeps track of who has exclusive access to the data.

Mutex is often described as protecting the data it holds by locking down the system

Mutex usage rules

  • Before using the data, you must try to acquire the lock (lock)
  • In the use of themutexAfter the data is secured, the data must be unlocked so that other threads can acquire the lock

Mutex API < T >

Mutex

is created by Mutex::new(data), which is a smart pointer.

Using the lock method to acquire the lock before accessing the data blocks the current thread, the lock may fail, and the MutexGuard (smart pointer that implements Deref and Drop) is returned.

use std::sync::Mutex;

fn main() {
    let m = Mutex::new(5);

    {
        let mut num = m.lock().unwrap();
        *num = 6;
    }

    println!("m = {:? }", m);
}
Copy the code

Lock method locks, out of scope will be customized unlock.

Mutex<T>

Let’s start with an example of a mistake:

use std::{sync::Mutex, thread};

fn main() {
    let counter = Mutex::new(0);
    let mut handles = vec![];

    // Create ten threads
    for _ in 0.10 {
        let handle = thread::spawn(move| | {/ / complains
            let mut num = counter.lock().unwrap();

            *num += 1;
        });

        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());
}
Copy the code

The reason for the error is that the ownership of counter has been moved during the first loop and subsequent threads cannot take ownership.

We learned that Rc

can have multiple owners for a value, but Rc

still returns an error because it does not implement the send trait, so it cannot be passed safely between threads.

In concurrent scenarios, Arc

can be used for atomic reference counting. Arc

is similar to Rc

in that its API is almost identical. A is for atomic, which means atom. Arc

is not used by default in the Arc

library, and the base types are not atomic because of the performance trade-off.




use std::{sync::{Mutex, Arc}, thread};

fn main() {
    let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![];

    // Create ten threads
    for _ in 0.10 {
        let counter = Arc::clone(&counter);
        let handle = thread::spawn(move| | {let mut num = counter.lock().unwrap();

            *num += 1;
        });

        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());
}
Copy the code

RefCell<T>/Rc<T> VS Mutex<T>/Arc<T>

  • Mutex<T>Provides internal variability andCellThe family as
  • We use theRefCell<T>To change theRc<T>What’s inside
  • We use theMutex<T>To change theArc<T>What’s inside
  • Note:Mutex<T>There is deadlock risk

The Send and Sync trait

Rust speech has few concurrency features, and most of the concurrency features currently available come from the standard library (rather than the language itself). We don’t need to be limited to the concurrency of the standard library, we can implement concurrency ourselves.

But there are two concepts of concurrency, or traits, in Rust:

  • std::marker::Sync
  • std::marker::Send

Note: Manually implementing Send and Sync is not safe.

Send

  • Allows transfer of ownership between threads
  • Almost all types are implemented in RustSend, butRc<T>Do not implementSend, so it is only suitable for single-threaded scenarios
  • Anything wholly owned bySendTypes composed of types are also marked asSend
  • With the exception of primitive Pointers, almost all the base types areSend

Sync

  • implementationSyncType can be safely referenced by multiple threads
  • In other words: ifTSync, then&TisSendReferences can be safely sent to another thread
  • The base types are bothSync
  • By completelySyncSo are types composed of typesSync, butRc<T>notSyncThe,RefCell<T>Cell<T>The family is notSync, andMutex<T>To achieve theSync