Note: This article is a text version of the video speech, the editor may appear some errors, welcome correction.

Lecturer: Lai Zhichao, Blockchain Architect of Onchain

Video address: www.bilibili.com/video/BV1Yy…

Post editing: Li Dongjie, Alibaba tao department of technology, flower name Qi Ji.

To introduce myself

Hi, today I’d like to share with you Rust’s asynchronous model and some of the concurrency challenges that come with implementing it. First of all, let me introduce the application of Rust in our company. Our company is an early developer of blockchain and has been established for more than four years. At present, our company is mainly a technology stack with Golang as the core, but we are also actively exploring Rust and have some application practices. First of all, our blockchain supports WASM virtual machine, using Rust to implement a version of JIT based on Cranelift/Wasmtime, which has been running for more than a year now. With the support of the WASM virtual machine, we are also working on the smart contract and the supporting tool chain. At present, the team’s smart contract development preferred Rust, which has the advantages of high development efficiency and fast iteration speed. A few days ago, we used Rust to develop smart contract code has reached 100,000. And the cryptographic library, which we also use Rust.

  1. Blockchain WASM JIT virtual machine: based on Cranelift/Wasmtime;
  2. Smart contract development library and supporting tool chain: Currently Rust is preferred for contract development, with high development efficiency and fast iteration speed;
  3. Cryptographic library;

Synchronize tasks to multi-threaded pools

To illustrate the asynchronous programming model, let’s first take a look at the familiar implementation of a multi-threaded pool of synchronous tasks. A typical implementation is shown on the left of the PPT. There is a Global Task Queue, which is called spawn to push tasks into the Global Queue. The global queue is associated with one or more worker threads, and each worker thread will poll and take out tasks from the global queue for execution, which is relatively simple to implement with code.

use std::thread;
use crossbeam::channel::{unbounded, Sender};
use once_cell::sync::Lazy;

type Task = Box<dyn FnOnce() + Send + 'static>;

static QUEUE: Lazy<Sender<Task>> = Lazy::new(|| {
    let (sender, reciver) = unbounded::<Task>();
    for _ in 0.4 {
        let recv = reciver.clone();
        thread::spawn(|| {
            for task in recv {
                task();
            }
        })
    }
    sender
});

fn spawn<F>(task: F) where F: FnOnce() + Send + 'static {
    QUEUE.send(Box::new(task)).unwrap();
}
Copy the code

In line 5, we define what is called a synchronous task, because a synchronous task needs to be executed only once, so it is FnOnce(), because the task is pushed from the user thread to the global queue, across threads to the worker thread, so it needs to have a Send constraint and static life cycle. And then encapsulate it into a Box. Line 8 builds a concurrent queue with four threads. Each thread gets to the receiving end of the queue and executes the task in a loop. Of course, the process of executing the task may panic. On line 17 the sender is stored on the global static variable QUEUE, and when the user calls spawn, gets the QUEUE and calls send, and pushes the task onto the QUEUE.

Multithreading of asynchronous tasks

type Task = Box<dyn FnMut() - >bool + Send + 'static>;
Copy the code

Let’s take a look at the multithreaded pool of asynchronous tasks. First, we define tasks that cannot be completed immediately but need to be executed multiple times as asynchronous tasks. Therefore, FnOnce() is not sufficient, and we need to use FnMut, which returns a Boolean value indicating whether the task is completed. The problem with this definition is that if the function is not executed by the worker thread, the worker thread will not know what to do next. If it waits until the task is executed, no other task in the global queue will be executed. You can’t just throw the task away. So Rust’s design is clever. Exector doesn’t care when the task is ready. It creates a Waker at execution time and tells the task, “If you’re ready, you can put it back on the global queue with Waker” so that it can be executed again. This would add the Waker parameter to the Task definition, as shown below:

type Task = Box<dyn FnMut(&Waker) -> bool + Send + 'static>;
Copy the code

In this way, when an asynchronous task is not ready, the Reactor can register Waker to a Reactor that can monitor the task status, such as Ioepoll and Timer. When the Reactor finds that the task is ready, the Reactor calls Waker to add the task to the global queue.

Multithreaded Executor for asynchronous tasks

In Rust, the standard definition of asynchronous computing is a Future trait

pub enum Poll<T> {
    Ready(T),
    Pending,
}

pub trait Future {
    type Output;
    fn poll(&mut self, cx: &Waker) -> Poll<Self::Output>;
    // fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
Copy the code

The poll method returns an enumeration poll, which is similar to returning a Boolean value but has a clearer semantics. It returns a Pending value if it is not good, or a Ready value if it is good. The standard library doesn’t use &mut self, it uses Pin<&mut self >, because it’s not going to take 30 minutes, so I’m going to skip it here. Here is the model diagram of the entire asynchronous task multithreading:

The user pushes the asynchronous task to the global queue using the spawn function. Then the worker thread takes the task and creates a Waker, passing it to the Future. If the task is completed, it is ok. If the task is not completed, the Future will register Waker to the Reactor, and then the Reactor will listen for events. The Reactor will wake Waker up and place the task in a global queue. This cycle repeats until the task is completed.

Requirements for the Waker interface

Waker plays an important role in this process. Let’s take a look at what requirements Waker’s interface needs to meet:

impl Waker {
    pub fn wake(self);
}

impl Clone for Waker;

impl Send for Waker;

impl Sync for Waker;
Copy the code

For user requirements, first Waker itself is a wake function, so it provides a wake method. An asynchronous task may care about multiple event sources, such as timers and IO. This means that the Waker may correspond to different reactors. The Future only uploadsone Waker to poll, and now the Waker needs to be registered to multiple reactors. I need clone. Then, the Executor and Waker may not be in the same thread. Waker needs to be sent to the Reactor across threads. Therefore, a Send constraint is required. Finally, the Waker may be called by multiple event sources at the same time. There is the problem of concurrent invocation, which requires the implementation of the Sync constraint. This is a requirement for Waker users.

impl Waker {
    pub unsafe fn from_raw(waker: RawWaker) -> Waker
}

pub struct RawWaker {
    data: *const (),
    vtable: &'static RawWakerTable,
}

pub struct RawWakerTable {
    clone: unsafe fn(*const ()) -> RawWaker,
    wake: unsafe fn(*const ()),
    wake_by_ref: unsafe fn(*const ()),
    drop: unsafe fn(*const()}Copy the code

While different executors have different internal implementations, Waker is a common, unified API. Some executors have a global queue, some have a thread-local queue, and some may only support the execution of a single task, so their wake-up mechanisms are completely different. To construct a unified Waker, polymorphism is involved. Rust uses custom virtual tables. The RawWaker has a data field and a static virtual table, and different executors implement all the methods in these virtual tables.

Concurrency issues to consider for the Waker implementation

Waker may have some concurrency problems in the implementation. Let’s start with the first problem, the concurrency between wake calls. We need to ensure that the task push is queued only once. If there are two (or more) reactors running Waker::wake at the same time, both of them successfully push the Waker::wake task to the global queue. If thread A gets the first push and THREAD B gets the second push, and thread A gets the first push and thread B gets the second push. Thread A and thread B now call poll at the same time, because poll itself has the &mut Self argument, which is mutually exclusive, which creates A thread-safety problem.

The second problem is the concurrency between the wake call and the poll call. A task is performing a poll, but when the poll call was made Waker was registered with a Reactor, and suddenly the Reactor is ready. Now it calls Waker::wake to try to push the task to the concurrent queue. If the push succeeds, then another thread will fetch the task from the queue and try to call poll, and the task is currently polling, thus causing the same concurrency problem as above.

Async-task perfectly solves these concurrency problems, and it provides a very elegant API, I put the source code parsing on Zhihu, you are interested in can have a look.

Asynchronous task multithreaded Executor

If async-task was used to handle the problem, the code would look like this:

use std::thread;
use crossbeam::channel::{unbounded, Sender};
use once_cell::sync::Lazy;
use async_task;

static QUEUE: Lazy<Sender<async_task::Task<()>>> = Lazy::new(|| {
    let (sender, reciver) = unbounded::<Task>();
    for _ in 0.4 {
        let recv = reciver.clone();
        thread::spawn(|| {
            for task in recv {
                task();
            }
        })
    }
    sender
});

fn spawn<F, R>(future: F) -> async_task::JoinHandle<R, ()> 
where 
    F: Future<Output = R> + Send + 'static,
    R: Send + 'static,
{
    let schedule = |task| QUEUE.send(task).unwrap();
    let (task, handle) = async_task::spawn(future, schedule, ());
    task.schedule();
    handle
}
Copy the code

As you can see, the code for the worker threads is basically the same compared to the previous synchronized task multithreaded pool, with some differences for the spawn function. Async_task is a simple way to process a multithreaded pool of asynchronous tasks.

Concurrency between Future and Reactor

If the Future does not poll well, it will register Waker to the Reactor, and there will be a problem that Waker is expired. The Executor may not pass the same Waker the first time the poll is called and the second time the poll is called. Only the latest Waker will wake up the task, not the old One. The problem is that the waker needs to be updated to the Reactor each poll to ensure that the task can be awakened.

For example, in the example above, Future is interested in two events, which correspond to two reactors. The Future needs to register its Waker with Reactor1 when it polls, and with Reactor2 when it polls again, it needs to update both Wakers each time. Now the problem is, The Future poll is executed in the Executor thread, and the Reactor is executed in the Reactor thread. One thread writes into it, and the other thread tries to read from it. In order to deal with this problem, the simplest way is to add a lock, each Reactor must add a lock to unlock, this operation itself is more complex, more time-consuming.

AtomicWaker perfectly handles this problem. It puts Waker into AtomicWaker in the mode of single producer and multiple consumers. AtomicWaker is shared by multiple reactors, and Waker only needs to be updated once. All the Reactors get the latest Waker.

Composability of a Future

The asynchronous task itself can be combined, for example, making an HTTPS request involves querying DNS to get the IP, setting up a TLS link, sending the request data, getting the response data, each step of the process is an asynchronous task, put these asynchronous tasks together to make a big asynchronous task. The Future itself is also combinable, such as in the following code:

future1
    .map(func)
    .then(func_return_future)
    .join(future2);
Copy the code

Because a Future must be sent to an Executor for execution, the code above has not been sent to an Executor, so it is not executed. The above code equals:

Join::new(
    Then::new(
        Map::new(future1, func), 
        func_return_future
    ), 
    future2
);
Copy the code

It is declarative, and eventually produces a structure, which is a tree structure like the one above. When the entire task is dropped to an Executor for execution, the poll Future starts at the root and goes to the leaf. The lowest leaf node futrue is specialized for dealing with Reactor. Therefore, most developers do not need to care about Reactor, so they may not know much about the concept of Reactor.

When a leaf node is not good, it will be handed down to the Waker to be registered in the Reactor. When the Reactor finds that the task is ready to move forward, waker is called to place the task in the global queue. When a thread gets the task, it polls again from the root node. So that’s the whole execution process.

Efficiency of JoinN combination

The Future composition model above involves the efficiency of a JoinN composition. How does the problem arise? Waker is only used to wake up the entire task, but does not carry any wake-up information, such as how the task was woken up. JoinN is responsible for the combination of multiple futures and concurrent execution, OIN4 four Future combination, each poll when one by one to perform the child Future, if there is no good will be registered to the Reactor, suppose the second suddenly good, The next time Join4 polls, it doesn’t know why it was awakened, so it has to go through the Future one by one, but the first, third, and fourth ones are wasted.

How to solve this problem? Futures – Rs has a FuturesUnordered that handles this. It can manage thousands of child futures. It has a built-in concurrent queue to maintain the child futures that are already ready. When Executor polls the entire task, it simply iterates through the concurrent queue and executes one by one, not passing Waker intact, but doing a wrapper intercept: When wake is called, it will add the Future to its ready queue and notify the Executor’s global queue. The Executor will then poll the Future directly from the built-in concurrent queue for maximum efficiency.

Synchronization between asynchronous tasks

There is also a traditional need for synchronization between multiple threads, such as locking. Asynchronous tasks are not completely isolated from each other. They may interact with each other with messages. Compare the difference between a thread and a Task:

| | thread | Task | | —- | —- | —- | | sleep | thread::park | return Pending | | wake | thread::unpark | Waker::wake | | Obtaining method | Thread ::current() | Parameter of poll |

If a thread wants to suspend its work, it can call Thread ::park. If a task wants to suspend its work, it can return Pending. A thread can be woken up by thread::unpark. A task needs to call Waker::wake. Thread ::current is called directly, and task is given the poll parameter to get waker.

Synchronous Mutex between asynchronous tasks

The Mutex data structure has a field that indicates the data to be locked, a locked atomic variable that indicates whether it is locked, and a wait queue. If an asynchronous task wants to grab a lock and doesn’t get it, it has to enter the wait queue and wait to be notified. If Locked is false before Waker gets the lock, then the lock has been taken successfully. If locked is false before Waker gets the lock, then he has to wait and throw Waker into the waiting queue. When the task that has the lock wants to release it, it changes locked to false and takes a Waker out of the waiting queue to wake up the task.

Here to tell you a lot of people’s misunderstanding, a lot of people think that asynchronous tasks are necessary to use asynchronous lock, synchronization lock blocking is not good, this is not correct. Most implementations of wait queues use synchronous locks, which means Mutex is not completely asynchronous, it has a synchronous lock in it. If you only want to protect a piece of data and add or subtract shared data in your application, then you should use STD synchronization locks because updating an internal queue with an asynchronous lock may require a synchronization lock, which is more expensive than updating shared data directly with a synchronization lock.

So when to use asynchronous locking? When protecting IO resources, asynchronous locks should be used first when your lock needs to span multiple.await locks with a large time difference.

Synchronous Oneshot between asynchronous tasks

What does Oneshot do? It is responsible for passing data between two threads, one executing task and the other waiting task, which will pass data to the latter via Oneshot. The figure shows the data structure of Oneshot. A lot of meta information is recorded in the state, such as whether the data has been written, whether the sender should be destroyed, whether TxWaker has been saved, whether RxWaker has been saved, and whether the receiver has been dropped.

When the sender sends data, the data is completely accessible by the sender freely before the state is modified. After writing data, the state state is changed, indicating that the data has been written. Then fetch the RxWaker from the receiving end and wake it up. After the wake up, the task can get the data in the next execution. If the sender did not send the data, now it is time to destroy it. When the sender is destructing, notice that the receiver is still waiting, so the sender destructor should also change the state, wake up the relevant RxWaker, and inform reciver not to wait any longer.

The receiver implementation is a Future, which itself reads the state when it polls. If there is any data, it indicates that the sender has finished writing the data and reads it directly. If there is no data, wait and store its waker in Oneshot’s RxWaker and update the corresponding state to indicate that the receiving RxWaker already exists. When the receiver drops the data, it tells the sender, “I’m not interested in your data now, you don’t need to calculate any more”, so the receiver changes the state when the drop is made, and gets the sender’s TxWaker from Oneshot to wake up the sender.

Synchronous WaitGroup between asynchronous tasks

Next up is my own implementation of WaitGroup, which is very common in Golang. It can construct multiple subtasks and wait for all of them to complete before continuing. Here is a demonstration code:

use waitgroup::WaitGroup;
use async_std::task;

async {
    let wg = WaitGroup::new();
    for _ in 0.100 {
        let w = wg.worker();
        task::spawn(async move {
            drop(w);
        });
    }
    wg.wait().await;
}
Copy the code

First create a WaitGroup, then create 100 workers, after each task is finished, just drop the worker, the task is complete. The WaitGroup then waits until all subtasks are complete to continue execution. Here’s the implementation, which is actually quite simple:

struct Inner {
    waker: AtomicWaker,
}

impl Drop for Inner {
    fn drop(&mut self) {
        self.waker.wake(); }}pub struct Worker {
    inner: Arc<Inner>,
}

pub struct WaitGroup {
    inner: Weak<Inner>
}

impl Future for WaitGroup {
    type Output = ();

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        match self.inner.upgrade() {
            Some(inner) => {
                inner.waker.register(cx.waker());
                Poll::Pending
            }
            None => Poll::Ready(())
        }
    }
}
Copy the code

Note that if a worker completes a task, it does not need to wake Up Waker. WaitGroup only cares that all tasks are finished and only needs to ask the last worker to wake up Waker. When is the last worker? We can borrow Arc from the standard library. Arc is a shared reference, and when all Arc strong references are destroyed, the data inside will be destroyed. Just wake Up Waker in the DROP method of the Arc wrapped data.

WaitGroup holds a weak reference, and all workers hold a strong reference. WaitGroup tries to upgrade the weak reference to a strong reference when polling. If the upgrade fails, all strong references are gone. If the upgrade is successful and there is now at least one strong reference, register Waker with AtomicWaker. There is a boundary condition here. At the end of the upgrade, all workers are dropped. In this case, wake will not be called, because when the upgrade succeeds, a temporary strong reference inner will be generated. Then waker.wake() is called to wake up the task, so no notifications are lost. The whole process is complete.

This article is published in the January issue of Rust Magazine and is licensed under a CC BY-NC-ND 4.0 International License. Please note the source for non-commercial reprint.