Author: FMhai/Xu Shuai


This article mainly introduces how to design and implement the Runtime of a Thread-to-per-core model based on IO -uring.

Our Runtime Monoio is now open source and can be found at github.com/bytedance/m… Find it.

We will introduce it through two chapters below:

  • The popular science articleA brief introduction to asynchronous IO and the main abstractions required in Rust Runtime.
  • Implement articleIn this section we will cover the implementation details of Monoio.

The popular science article

epoll & io-uring

In order to achieve asynchronous concurrency, we need the kernel to provide the ability to handle other tasks while one IO is blocked.

epoll

Epoll is an excellent IO event notification mechanism in Linux. It can listen for the ready status of multiple FDS at the same time.

It mainly contains three syscall:

  1. epoll_createExample Create an epoll fd.
  2. epoll_ctlAdd, modify, or delete fd events that epoll fd listens on.
  3. epoll_waitWait to listen for fd events and return when any event occurs; It is also supported to pass in a timeout, so that it can be returned after a timeout even if there is no Ready event.

If you don’t use the epoll class and do syscall directly, you need to put the FD in blocking mode (the default) so that when you want to read from the FD, the read blocks until there is something to read.

When using epoll, fd needs to be set to non-blocking mode. When read, the host will also immediately return WOULD_BLOCK without any data. All you need to do is register the FD with the epoll FD and set the EPOLLIN event.

Then, when I have nothing to do (all tasks are stuck in IO), I fall into syscall epoll_wait. When an event returns, the corresponding FD is read (depending on the trigger mode set at the time of registration, something else may be done to ensure that the next read works).

In summary, the mechanism is simple: set the FD to non-blocking mode, register it with ePoll FD when needed, and then operate on the FD when an epoll FD event is triggered. This turns the blocking problem of multiple FDS into a blocking problem of a single FD.

io-uring

Unlike Epoll, IO-uring is not an event notification mechanism, it is a true asynchronous SYscall mechanism. You don’t need to manually use Syscall after it notifies you, because it already does it for you.

The IO-uring is mainly composed of two rings (SQ and CQ), SQ for submitting the task and CQ for receiving the notification of completion of the task. A task (Op) usually corresponds to a Syscall (for example, read corresponds to ReadOp) and specifies parameters and flags for this syscall.

When Submit, the kernel consumes all SQE and registers the callback. Later, when data is available, if a nic outage occurs and the data is read in through the driver, the kernel will trigger these callbacks to do what the Op wants, such as copying a particular FD to a buffer (which is the user-specified buffer). Compared to Epoll, IO-uring is purely synchronous.

Note: The IO -uring references in this section are descriptions of FAST_POLL.

Git.kernel.org/pub/scm/lin…

To summarize, iO-uring and Epoll are almost the same in terms of use. Submit_and_wait (1) when there is nothing to do (all tasks are stuck in IO). They are liburing’s encapsulation of The Enter); After returning to consume CQ, you can get syscall results. If you care about latency, you can be more aggressive with submit and push the task to return as soon as the data is ready, but at the same time pay the price of increasing syscall.

The execution flow of asynchronous tasks

In normal programming, our code control flow is thread-specific. As you understand when writing C, your code is compiled directly into assembly instructions and then executed by the “threads” provided by the operating system, with no extra insertion logic.

Taking epoll as an example, epoll-based asynchrony is essentially the multiplexing of threads. Then code like the following in normal mode cannot be used in this scenario:

for connection = listener.accept():
    do_something(connection)
Copy the code

Because accept waits for the I/O in this code, blocking directly causes the thread to block, and no other tasks can be performed.

Callback-oriented programming

Libevent, commonly used in C/C++, is such a model. The user code does not have control (because there is only one thread that has control, and the user has thousands of tasks), but rather associates an event with the Callback by registering it with Libevent. When an event occurs, libevent calls the user’s callback function and passes the event parameters to the user. After the user has initialized some callback, it gives control of the thread to Libevent. It helps handle the interaction with epoll internally and executes the callback when ready.

It’s efficient, but it can be daunting to write. For example, if you want to make an HTTP request, you need to break this code into synchronized functions and string them together via callback:

Behavior that might otherwise condense into one function at a time is broken down into a bunch of functions. Compared to procedural programming, state-oriented programming is messy and prone to problems when the coder forgets some details.

A stack coroutines

What if we could insert some logic between the user code and the end product? Like Golang, the user code actually only corresponds to the goroutine that can be scheduled, and it is the Go Runtime that actually has control of the thread. Goroutine can be scheduled by the Runtime and preempted during execution.

When a Goroutine needs to be interrupted and switched to another goroutine, the Runtime only needs to modify the current stack frame. The stack for each goroutine is actually on the heap, so it can be broken and undone at any time.

The network library also works with the runtime. Syscall is non-blocking and automatically hangs on Netpoll.

Stack coroutines with runtime decouple task-level user code from thread correspondence.

Future-based stackless coroutines

The context-switching overhead of stacked coroutines cannot be ignored. Because it can be interrupted at any time, it is necessary to save the register context at that time, otherwise it will not be able to restore the scene when it is restored.

Stackless coroutines do not have this problem and this model is very much in line with Rust’s Zero Cost Abstraction. The async + await code in Rust is essentially an automatic expansion of code. The async + await code is automatically expanded into a state machine based on the LLVM Generator. The state machine implements the Future through poll and Runtime interaction (see this article for details).

The Rust asynchronous mechanism works

Rust’s asynchronous design is complex, and the library interface is decoupled from the Runtime implementation.

Rust’s asynchrony relies on Future traits.

pub trait Future {
    type Output;
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
Copy the code

How is a Future implemented? It is obvious from the above trait definition that the poll method is used. The result returned is Poll, which is either Pending or Ready(T).

So who calls the poll method?

  1. The user. A Future can be implemented manually by a user, such as wrapping a Future, which obviously needs to be implementedpollAnd call theinner.poll.
  2. The Runtime. The Runtime is the ultimatepollThe caller of.

As implementors of a Future, we need to ensure that once Poll::Pending is returned, we can wake it up when the IO on which the Future depends is ready. Waking up a Future is done by Waker in the Context. As for what to do after waking up, this is done by the CX provided by the Runtime itself (such as adding the Task to the queue).

So anything that generates an event is responsible for storing the Waker and waking it up when Ready; The cX provider receives the wake up operation and is responsible for rescheduling it. These two concepts correspond to Reactor and Executor respectively, which are decoupled by Waker and Future.

Reactor

For example, you could even implement your own Reactor (or Reactor) on top of Tokio. In fact, tokio-Uring did just that: It registered a Uring FD with Tokio(MIO) itself, and based on that FD and its own Pending Op management system, it exposed the ability of the Reactor as a source of events. In Tokio-Tungstenite, the read-write wakeup problem is also addressed through WakerProxy.

Another example is timer drivers. Obviously, the IO event is signaled by epoll/io-uring, etc., but the timer is not. It maintains a Time wheel or n-fork heap inside, so it must be the responsibility of the Time Driver to wake up the Waker, so it must need to store the Waker. Time Driver is a Reactor implementation in this sense.

Executor

Reactor let’s talk about Executor. Executors are responsible for task scheduling and execution. Take the thread-per-core scenario as an example (why use this example? How easy to write without cross-thread scheduling), can be implemented as a VecDeque, where all the Executor does is take tasks from it and call its poll method.

IO components

You might wonder, since the Reactor is responsible for waking up tasks, and the Executor is responsible for executing awakened tasks, who is responsible for registering IO to the Reactor?

IO libraries (such as TcpStream implementations) are responsible for registering IO to Reactor if it is not completed immediately. This is one of the reasons why you have to use Tokio:: NET ::TcpStream instead of the standard library when using Tokio; And if you want to sleep asynchronously you also need to use the sleep methods provided by Runtime.

Implement a minimalist Runtime

For simplicity, we write this Runtime using epoll. This is not our final product, but just to demonstrate how to implement the simplest Runtime.

The complete code for this section is available at github.com/ihciah/mini… .

The Reactor, Executor and IO components are implemented separately. Let’s start with Reactor.

Reactor

Because the bare epoll experience is somewhat poor and the focus of this article is not a Rust Binding, the Polling Crate is used to complete the epoll operation.

The basic use of Polling Crate is to create a Poller and add or modify fd and interest to the Poller. This package defaults to oneshot mode (EPOLLONESHOT) and needs to be re-registered after the event is triggered. This might be useful in multi-threaded scenarios, but it doesn’t seem necessary in our minimalist single-threaded version because of the additional epoll_ctl syscall overhead. But for simplicity’s sake, we’ll still use it.

As a Reactor, when registering interest with Poller, we need to provide a corresponding identifier, which in many other places would be called Token or UserData or Key. When Event Ready, this identifier is returned as is.

So what we need to do is something like this:

  1. createPoller
  2. When you need to pay attention to the Readable or Writable of a PARTICULAR FD, query the Readable or Writable of a particular FDPollerAdd interest Events and save the Waker
  3. Before adding an interest Event, we need to allocate a Token, so that we can know where the Waker corresponding to this Event is only after the Event is Ready.

So we can design a Reactor as:

pub struct Reactor {
    poller: Poller,
    waker_mapping: rustc_hash::FxHashMap<u64, Waker>,
}
Copy the code

Other Runtime implementations tend to use slab, which handles both Token allocation and Waker storage.

For simplicity, the HashMap is used directly to save the Token and Waker relationship. Waker storage more trick way to solve: Since we only care about read and write, we define the MapKey corresponding to read as fd * 2 and the MapKey corresponding to write as fd * 2 + 1. Have separate Waker); The Event UserData (Token) still uses the FD itself.

impl Reactor {
    pub fn modify_readable(&mut self, fd: RawFd, cx: &mut Context) {
        println!("[reactor] modify_readable fd {} token {}", fd, fd * 2);

        self.push_completion(fd as u64 * 2, cx);
        let event = polling::Event::readable(fd as usize);
        self.poller.modify(fd, event);
    }

    pub fn modify_writable(&mut self, fd: RawFd, cx: &mut Context) {
        println!("[reactor] modify_writable fd {}, token {}", fd, fd * 2 + 1);

        self.push_completion(fd as u64 * 2 + 1, cx);
        let event = polling::Event::writable(fd as usize);
        self.poller.modify(fd, event);
    }

    fn push_completion(&mut self, token: u64, cx: &mut Context) {
        println!("[reactor token] token {} waker saved", token);

        self.waker_mapping.insert(token, cx.waker().clone()); }}Copy the code

It is also easy to attach or remove a FD to a Poller:

impl Reactor {
    pub fn add(&mut self, fd: RawFd) {
        println!("[reactor] add fd {}", fd);

        let flags =
            nix::fcntl::OFlag::from_bits(nix::fcntl::fcntl(fd, nix::fcntl::F_GETFL).unwrap())
                .unwrap();
        let flags_nonblocking = flags | nix::fcntl::OFlag::O_NONBLOCK;
        nix::fcntl::fcntl(fd, nix::fcntl::F_SETFL(flags_nonblocking)).unwrap();
        self.poller
            .add(fd, polling::Event::none(fd as usize))
            .unwrap();
    }

    pub fn delete(&mut self, fd: RawFd) {
        println!("[reactor] delete fd {}", fd);

        self.completion.remove(&(fd as u64 * 2));
        println!("[reactor token] token {} completion removed", fd as u64 * 2);
        self.completion.remove(&(fd as u64 * 2 + 1));
        println!(
            "[reactor token] token {} completion removed",
            fd as u64 * 2 + 1); }}Copy the code

One caveat is that you should set it to Nonblocking before you mount it, otherwise it will block if there is a false wake up (epoll is not guaranteed against false wake up) while doing syscall.

Then we face the question: when is epoll_wait? The answer is when there is no task. If all tasks are waiting for IO, then we can safely fall into Syscall. So our Reactor needs to expose a WAIT interface for executors to wait when there are no tasks.

pub struct Reactor {
    poller: Poller,
    waker_mapping: rustc_hash::FxHashMap<u64, Waker>,

    buffer: Vec<Event>,
}

impl Reactor {
    pub fn wait(&mut self) {
        println!("[reactor] waiting");
        self.poller.wait(&mut self.buffer, None);
        println!("[reactor] wait done");

        for i in 0.self.buffer.len() {
            let event = self.buffer.swap_remove(0);
            if event.readable {
                if let Some(waker) = self.waker_mapping.remove(&(event.key as u64 * 2)) {
                    println!(
                        "[reactor token] fd {} read waker token {} removed and woken",
                        event.key,
                        event.key * 2); waker.wake(); }}if event.writable {
                if let Some(waker) = self.waker_mapping.remove(&(event.key as u64 * 2 + 1)) {
                    println!(
                        "[reactor token] fd {} write waker token {} removed and woken",
                        event.key,
                        event.key * 2 + 1
                    );
                    waker.wake();
                }
            }
        }
    }
}
Copy the code

A pre-allocated buffer (Vec

) needs to be provided when receiving syscall results. To avoid allocation every time, we save the buffer directly in the structure and use Option wrapping to temporarily take ownership of it.

To wait, you need to:

  1. In the syscall
  2. When syscall returns, all ready events are processed. If the event is readable or writable, then find the corresponding Completion from the HashMap and delete it, then wake it up. (The corresponding rules for this FD and Map Key are also described earlier: Readable correspondsfd * 2And writablefd * 2 + 1).

Finally, complete the Reactor’s founding function:

impl Reactor {
    pub fn new() - >Self {
        Self {
            poller: Poller::new().unwrap(),
            waker_mapping: Default::default(),

            buffer: Vec::with_capacity(2048),}}}impl Default for Reactor {
    fn default() - >Self {
        Self::new()
    }
}
Copy the code

That’s when our Reactor is done. In general, epoll is wrapped with additional Waker storage and wake up.

Executor

Executor needs to store tasks and execute them.

Task

What is a Task? Task is actually a Future, but since tasks need shared ownership, we use Rc to store them; And we only know that the user threw in a Future, but we don’t know what type of Future it is, so we need to Box it up, using LocalBoxFuture. Combined with internal variability, Task is defined as follows:

pub struct Task {
    future: RefCell<LocalBoxFuture<'static, () > >,}Copy the code

TaskQueue

Design the Task’s storage structure and use VecDeque directly for simplicity.

pub struct TaskQueue {
    queue: RefCell<VecDeque<Rc<Task>>>,
}
Copy the code

The TaskQueue needs to be able to push and pop tasks:

impl TaskQueue {
    pub(crate) fn push(&self, runnable: Rc<Task>) {
        println!("add task");
        self.queue.borrow_mut().push_back(runnable);
    }

    pub(crate) fn pop(&self) - >Option<Rc<Task>> {
        println!("remove task");
        self.queue.borrow_mut().pop_front()
    }
}
Copy the code

Waker

Executor needs to provide the Context, which contains Waker. Waker needs to be able to push tasks into execution queues while being woken.

pub struct Waker {
    waker: RawWaker,
}

pub struct RawWaker {
    data: *const (),
    vtable: &'static RawWakerVTable,
}
Copy the code

Waker implements dynamic distribution through self-pointers and Vtables. So there are two things we need to do:

  1. Get a pointer to the Task structure and maintain its reference count
  2. Generate vTables corresponding to types

We can define vtable as follows:

struct Helper;

impl Helper {
    const VTABLE: RawWakerVTable = RawWakerVTable::new(
        Self::clone_waker,
        Self::wake,
        Self::wake_by_ref,
        Self::drop_waker,
    );

    unsafe fn clone_waker(data: *const ()) -> RawWaker {
        increase_refcount(data);
        let vtable = &Self::VTABLE;
        RawWaker::new(data, vtable)
    }

    unsafe fn wake(ptr: *const ()) {
        let rc = Rc::from_raw(ptr as *const Task);
        rc.wake_();
    }

    unsafe fn wake_by_ref(ptr: *const ()) {
        let rc = mem::ManuallyDrop::new(Rc::from_raw(ptr as *const Task));
        rc.wake_by_ref_();
    }

    unsafe fn drop_waker(ptr: *const ()) {
        drop(Rc::from_raw(ptr as *constTask)); }}unsafe fn increase_refcount(data: *const ()) {
    let rc = mem::ManuallyDrop::new(Rc::<Task>::from_raw(data as *const Task));
    let _rc_clone: mem::ManuallyDrop<_> = rc.clone();
}
Copy the code

Manually manage the reference count: we’ll get a bare pointer to Rc

via Rc:: into_RAW and use it to build RawTask with vtable and then build Task. In the VTABLE implementation, we need to be careful to manually manage the reference count: for clone_waker, we only clone a pointer, but it has a copy of its meaning, so we need to manually increment its reference count by one.

Task implements wake_ and wake_by_ref_ to reschedule the Task. All a rescheduling task does is simply take the executor from thread Local Storage and push it to the TaskQueue.

impl Task {
    fn wake_(self: Rc<Self>) {
        Self::wake_by_ref_(&self)}fn wake_by_ref_(self: &Rc<Self>) {
        EX.with(|ex| ex.local_queue.push(self.clone())); }}Copy the code

Executor

With these components in place, building executors is pretty straightforward.

scoped_tls::scoped_thread_local! (pub(crate) static EX: Executor);

pub struct Executor {
    local_queue: TaskQueue,
    pub(crate) reactor: Rc<RefCell<Reactor>>,

    /// Make sure the type is `! Send` and `! Sync`.
    _marker: PhantomData<Rc<()>>,
}
Copy the code

When the user spawn Task:

impl Executor {
    pub fn spawn(fut: impl Future<Output = ()> + 'static) {
        lett = Rc::new(Task { future: RefCell::new(fut.boxed_local()), }); EX.with(|ex| ex.local_queue.push(t)); }}Copy the code

All you have to do is take the incoming Future Box and build Rc

and throw it into the execution queue.

So where is the Executor’s main loop? We can put it in block_ON.

impl Executor {
    pub fn block_on<F, T, O>(&self, f: F) -> O
    where
        F: Fn() -> T,
        T: Future<Output = O> + 'static,
    {
        let _waker = waker_fn::waker_fn(|| {});
        let cx = &mut Context::from_waker(&_waker);

        EX.set(self, | | {letfut = f(); pin_utils::pin_mut! (fut);loop {
                // return if the outer future is ready
                if let std::task::Poll::Ready(t) = fut.as_mut().poll(cx) {
                    break t;
                }

                // consume all tasks
                while let Some(t) = self.local_queue.pop() {
                    let future = t.future.borrow_mut();
                    let w = waker(t.clone());
                    let mut context = Context::from_waker(&w);
                    let _ = Pin::new(future).as_mut().poll(&mut context);
                }

                // no task to execute now, it may ready
                if let std::task::Poll::Ready(t) = fut.as_mut().poll(cx) {
                    break t;
                }

                // block for io
                self.reactor.borrow_mut().wait(); }}}})Copy the code

This is a bit complicated and can be broken down into the following steps:

  1. Create a dummy_waker that doesn’t actually do anything.
  2. (in loop)poll incoming future, check for ready, return if ready, end block_ON.
  3. (in loop) Processes all tasks in the TaskQueue: build its corresponding Waker and poll it.
  4. (in loop) At this point, there are no tasks waiting to be executed, either the master Future is ready, or both are waiting for I/OS. So check the main Future again and return if ready.
  5. Since everyone is waiting for IO, let’sreactor.wait(). The reactor gets stuck in syscall and waits for at least one I/O to execute, then wakes up the Task and pushes the Task to the TaskQueue.

At this point the Executor is almost finished.

IO components

The IO component will hang on Reactor when the WouldBlock is found. For TcpStream, we need tokio:: IO ::AsyncRead.

pub struct TcpStream {
    stream: StdTcpStream,
}
Copy the code

You need to add it to the Poller when creating TcpStream and remove it when destroying it:

impl From<StdTcpStream> for TcpStream {
    fn from(stream: StdTcpStream) -> Self {
        let reactor = get_reactor();
        reactor.borrow_mut().add(stream.as_raw_fd());
        Self { stream }
    }
}

impl Drop for TcpStream {
    fn drop(&mut self) {
        println!("drop");
        let reactor = get_reactor();
        reactor.borrow_mut().delete(self.stream.as_raw_fd()); }}Copy the code

When AsyncRead is implemented, it is read syscall. Because fd was set to non-blocking when it was added to Poller, syscall is safe here.

impl tokio::io::AsyncRead for TcpStream {
    fn poll_read(
        mut self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
        buf: &mut tokio::io::ReadBuf<'_>,
    ) -> Poll<io::Result< () > > {let fd = self.stream.as_raw_fd();
        unsafe {
            let b = &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]);
            println!("read for fd {}", fd);
            match self.stream.read(b) {
                Ok(n) => {
                    println!("read for fd {} done, {}", fd, n);
                    buf.assume_init(n);
                    buf.advance(n);
                    Poll::Ready(Ok(()))}Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
                    println!("read for fd {} done WouldBlock", fd);
                    // modify reactor to register interest
                    let reactor = get_reactor();
                    reactor
                        .borrow_mut()
                        .modify_readable(self.stream.as_raw_fd(), cx);
                    Poll::Pending
                }
                Err(e) => {
                    println!("read for fd {} done err", fd);
                    Poll::Ready(Err(e))
                }
            }
        }
    }
}
Copy the code

Read Syscall may return the correct result or an error. One of the errors requires special handling, and that is WouldBlock. When WouldBlock is found, we need to attach it to the Reactor, where we are concerned about Readabe via the function modify_readable we defined earlier. After the hang Reactor action completes, we can safely return Poll::Pending because we know it will wake up later.

Implement article

Monoio

Motivation

During my work on Mesh Proxy (Envoy based), I felt we had to take a very unelegant approach to code organization and design due to C++ issues.

So I tried to investigate linkerd2-proxy (a Mesh proxy based on Rust + Tokio) to replace the existing version. Pressure measurements show that the performance improvement in HTTP scenarios is only about 10%. And Envoy pressure data showed that a significant amount of CPU consumption was on Syscall.

We can use Rust generic programming to eliminate the runtime overhead of dynamic distribution-based abstractions in C++; On IO, we considered using Io-uring instead of Epoll.

The early stage of the research

In the early stage of the project, we compared the performance of several schemes: 1. Tokio 2. Glommio 3. Naked epoll 4. Naked Io-uring. After that, it is found that naked IO -uring does lead in performance, but IO -uring based Glommio is not so good. We tried to fork and optimize the code of Glommio and found that its project itself has big problems, such as the creation of uring flag seems not to be properly understood. Its Task implementation also performs poorly compared to Tokio’s.

Build your own wheel

Eventually we decided to build our own Runtime to meet our internal requirements and provide extreme performance.

This project was done by me and @Dyxushuai. During our implementation, we made a lot of references to Projects like Tokio, Tokio-Uring, and tried out some of our own designs.

Model to discuss

Different design models will have their own application scenarios.

Tokio uses a fair scheduling model with an internal scheduling logic similar to Golang in that tasks can be moved between threads to maximize the performance of multiple cores.

Glommio is also a Rust Runtime, which is implemented based on IO -uring, scheduling logic is simpler than Tokio, is a Thread-per-core model.

Both models have their advantages and disadvantages. The former is more flexible and versatile, but the cost is not small:

  1. Poor performance on multi-core machines.

    In my tests of the 1K Echo (2021-11-26 latest version), the performance with Tokio 4 Core was only about 2.2 times better than with 1 Core. Our own Monoio can be almost linear.

    1 Core 4 Cores

    A detailed test report is available here.

  2. The constraints on the Task itself cannot be ignored. If a Task can be scheduled between threads, it must implement Send + Sync. This is a significant limitation for user code.

    For example, if you want to implement a cache service, based on the fair scheduling model, the map of the cache should be sent + Sync via Atomic or Mutex etc. If you implement the Thread-per-core model, you can simply use Thread Local. Also, Nginx and Envoy are based on this model.

But Thread-per-core is not a silver bullet. For example, in a business system, different requests may be processed logically differently, with some long connections requiring a lot of computation and others consuming little CPU. Based on this model, it is likely to result in an imbalance between CPU cores, with one core fully loaded and the other very idle.

event-driven

I -uring and epoll are discussed here.

Epoll is just a notification mechanism, and essentially things are done by user code directly syscall, such as read. In a syscall scenario with high frequency, frequent user-mode kernel switching consumes many resources. IO -uring can do asynchronous SYscall, even without SQ_POLL can greatly reduce the number of syscAll.

The problem with IO -uring is the following:

  1. Compatibility issues. Platform compatibility aside, Linux Only (ePoll has a similar presence on other platforms and can be seamlessly compatible based on the already well-established MIO). Linux also has certain requirements on the kernel version, and the implementation performance of different versions is different. Large companies often have their own versions of the kernel modified, so keeping up with Backport can be a headache. At the same time, for Mac/Windows users, the development experience will bring some difficulties.
  2. Buffer life cycle problems. IO -uring is all asynchronous, the buffer cannot be moved after Op push to SQ. It must be guaranteed to be valid until SYscall is completed or Cancel Op is executed. In both C/C++ and Rust, buffer lifecycle management is a problem. Epoll does not have this problem because syscall is created by the user and the buffer cannot be operated during syscall, so it is guaranteed to last until Syscall returns.

Life cycle, IO interface and GAT

The previous section mentioned this problem with IO -uring: You need some mechanism to ensure that buffers are valid during Op execution.

Consider the following case:

  1. The user created a Buffer
  2. The user gets a reference to the buffer (whether or not&or&mut) to do read and write.
  3. The Runtime returns the Future, but the user drops it directly.
  4. Now that no one is holding references to buffer, the user can Drop them.
  5. However, the address and length of the buffer have already been submitted to the kernel, and it may be about to be processed, or it may already be processed. We can push in oneCancelOpInside, but we can’t promise anythingCancelOpBe consumed immediately.
  6. The Kernel is already operating on the wrong memory, which can cause memory corruption if it is reused by user programs.

If Rust implements Async Drop, the same thing can be done — taking references and using buffers in the normal way; However, we cannot guarantee that the kernel will cancel reading and writing to buffer in time.

Therefore, it is difficult to guarantee the validity of buffers without taking ownership. This presents a new challenge to the IO interface: whereas normal IO interfaces only need to give &self or &mut self, we have to give ownership.

For this part of the design, we referenced Tokio-uring and defined it as a trait. The Trait must start GAT.

/// AsyncReadRent: async read with a ownership of a buffer
pub trait AsyncReadRent {
    /// The future of read Result<size, buffer>
    type ReadFuture<'a, T>: Future<Output = BufResult<usize, T>>
    where
        Self: 'a,
        T: 'a;
    /// The future of readv Result<size, buffer>
    type ReadvFuture<'a, T>: Future<Output = BufResult<usize, T>>
    where
        Self: 'a,
        T: 'a;

    /// Same as read(2)
    fn read<T: IoBufMut>(&self, buf: T) -> Self::ReadFuture<'_, T>;
    /// Same as readv(2)
    fn readv<T: IoVecBufMut>(&self, buf: T) -> Self::ReadvFuture<'_, T>;
}

/// AsyncWriteRent: async write with a ownership of a buffer
pub trait AsyncWriteRent {
    /// The future of write Result<size, buffer>
    type WriteFuture<'a, T>: Future<Output = BufResult<usize, T>>
    where
        Self: 'a,
        T: 'a;
    /// The future of writev Result<size, buffer>
    type WritevFuture<'a, T>: Future<Output = BufResult<usize, T>>
    where
        Self: 'a,
        T: 'a;

    /// Same as write(2)
    fn write<T: IoBuf>(&self, buf: T) -> Self::WriteFuture<'_, T>;

    /// Same as writev(2)
    fn writev<T: IoVecBuf>(&self, buf_vec: T) -> Self::WritevFuture<'_, T>;
}
Copy the code

Like Tokio, we also provide an Ext with a default implementation:

pub trait AsyncReadRentExt<T: 'static> {
    /// The future of Result<size, buffer>
    type Future<'a>: Future<Output = BufResult<usize, T>>
    where
        Self: 'a,
        T: 'a;

    /// Read until buf capacity is fulfilled
    fn read_exact(&self, buf: T) -> <Self as AsyncReadRentExt<T>>::Future<'_>;
}

impl<A, T> AsyncReadRentExt<T> for A
where
    A: AsyncReadRent,
    T: 'static + IoBufMut,
{
    type Future<'a>
    where
        A: 'a,
    = impl Future<Output = BufResult<usize, T>>;

    fn read_exact(&self.mut buf: T) -> Self::Future<'_> {
        async move {
            let len = buf.bytes_total();
            let mut read = 0;
            while read < len {
                letslice = buf.slice(read.. len);let (r, slice_) = self.read(slice).await;
                buf = slice_.into_inner();
                match r {
                    Ok(r) => {
                        read += r;
                        if r == 0 {
                            return (Err(std::io::ErrorKind::UnexpectedEof.into()), buf); }}Err(e) => return (Err(e), buf),
                }
            }
            (Ok(read), buf)
        }
    }
}

pub trait AsyncWriteRentExt<T: 'static> {
    /// The future of Result<size, buffer>
    type Future<'a>: Future<Output = BufResult<usize, T>>
    where
        Self: 'a,
        T: 'a;

    /// Write all
    fn write_all(&self, buf: T) -> <Self as AsyncWriteRentExt<T>>::Future<'_>;
}

impl<A, T> AsyncWriteRentExt<T> for A
where
    A: AsyncWriteRent,
    T: 'static + IoBuf,
{
    type Future<'a>
    where
        A: 'a,
    = impl Future<Output = BufResult<usize, T>>;

    fn write_all(&self.mut buf: T) -> Self::Future<'_> {
        async move {
            let len = buf.bytes_init();
            let mut written = 0;
            while written < len {
                letslice = buf.slice(written.. len);let (r, slice_) = self.write(slice).await;
                buf = slice_.into_inner();
                match r {
                    Ok(r) => {
                        written += r;
                        if r == 0 {
                            return (Err(std::io::ErrorKind::WriteZero.into()), buf); }}Err(e) => return (Err(e), buf),
                }
            }
            (Ok(written), buf)
        }
    }
}
Copy the code

Turning on GAT makes a lot of things easier.

We define the lifecycle on the association type Future in the trait so that it can capture &self instead of having to Clone parts of self, or define a separate structure with a lifecycle tag.

To define the Future

How do you define a Future? In general we need to define a structure and implement a Future trait for it. The key here is to implement the poll function. This function takes the Context and returns the Poll synchronously. To implement poll we generally need to manage state manually, which is very difficult and error-prone to write.

At this point you might say, can’t direct async and await be used? In fact the async block does generate a state machine, much like what you would write by hand. The problem is, the generated structure doesn’t have a name, so if you want to use the type of the Future as an association type, it’s hard. You can enable type_ALIAS_IMPL_trait and use opaque Type as the association type. You can also pay some runtime overhead by using Box

.

Generate Future

In addition to using async blocks, the normal way is to manually construct a structure that implements the Future. There are two kinds of Future:

  1. Future with ownership, no additional lifecycle markup is required. This kind ofFutureIt’s not associated with any of the other structures, if you need to make it depend on some noCopyThat you can consider usingRcArcThat kind of shared ownership structure.
  2. A Future with a reference, which is a structure with a lifecycle tag on it. For example, in TokioAsyncReadExt.readThe signature isfn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Read<'a, Self>. Constructed hereRead<'a, Self>References to self and BUf are captured, which has no runtime overhead compared to shared ownership. However, this Future does not work as a type alias for traits and can only be turned ongeneric_associated_typestype_alias_impl_traitThen use opaque Type.

Define the IO trait

In general, our IO interface is defined as a poll (such as poll_read), and any packaging of IO should be based on this trait (we’ll temporarily call it the base trait).

But for user-friendly interfaces, an additional Ext trait is typically provided, mostly using its default implementation. Ext Traits are automatically implemented for all classes that implement the base trait. For example, read returns a Future, and it is obviously easier to use await based on this Future than to manage state and poll manually.

So why are base traits defined in poll form? Can’t we just do Future all at once? Because the poll form is synchronous, it doesn’t need to capture anything, is easy to define and more general. If you define a Future all at once, you can either write the type that returns the Future to death, as Ext does (which makes it unwrapable and user-implemented, thus losing the point of defining traits), or use the Future type as an association type (as mentioned earlier, There is no way to take a life cycle without GAT enabled.

So to summarize, in the current stable version of Rust, only the base Trait + Future trait in poll form can be used to define IO interfaces.

This can be done once GAT is enabled. We can capture self by defining a Future with a lifecycle directly in the trait’s association type.

Is this a silver bullet? It isn’t. The only problem is that if you use the GAT set of patterns, you always use it. If you bounce back and forth between poll and GAT, you’ll be miserable. The poll-based interface maintains its own state, and it does implement a Future (the simplest implementation being poll_fn); But the reverse is hard: it’s hard to store a Future with a life cycle. Although this can be done (and cost) using unsafe hacks, it’s still very restrictive and not recommended. Monoio-compat’s GAT based Future implements Tokio’s AsyncRead and AsyncWrite, if you must try it out.

Buffer management

The Buffer management references the Tokio-uring design.

The Buffer provides ownership by the user and returns ownership when the Future completes. Use Slab to maintain a global state, transfer ownership of the internal Buffer to the global state when Op drop, and do real destruction on CQE. Return to ownership on normal completion.

Time Driver design

Many scenarios require timers, such as timeouts, to select two futures, one of which is timeout. Asynchronous sleep must be supported as Runtime.

Timer management and wake up

This part of Glommio is relatively simple to realize, BTreeMap is directly used to maintain the mapping of Instant -> Waker, split_off the current time every time to wake up all expired timer and calculate the next wake interval. It is then passed as a parameter in driver Park. Tokio similarly has its own time wheel implementation, more complex but also more efficient (less accurate than Glommio’s implementation).

For our performance-first implementation, we chose a time wheel solution similar to Tokio’s.

And Driver integration

Under epoll, we need to check the current most recent timer before we do a wait. If so, its timeout event must be passed as an argument to the wait. Otherwise, if no IO is ready during this time, we will miss the expected wake up time of the timer. If the user wants to timeout 100ms, it may wake up after 200ms, which is meaningless.

Tokio does this internally based on EPOLL and time rounds. EPOLL acts as the IO Driver and encapsulates the Timer Driver on top of this. Before the Timer Driver falls into syscall, the most recent event timeout interval in the time round is calculated as an epoll wait parameter.

Why not just use TimerFd and use the epoll capability? This is a bit heavy: timerfd creation, addition and removal using epoll_ctl are all syscall, and coarse-grained merging is not possible (for time rounds).

However, the IO -uring Enter does not support passing in a timeout. We can only do this by pushing TimeoutOp to SQ.

Plan 1

Push TimeoutOp when inserting element into the empty time circle; And push TimeoutRemoveOp when the cell is canceled until the number is 0 (you can cancel this section without pushing, but at an extra cost of a false wake-up call).

For example, we would create five 10ms timeouts that would be inserted into the same cell of the time wheel. We push a 10ms TimeoutOp to SQ when the number in this cell changes from 0 to 1.

Scheme 2

Calculate the latest timeout before each wait, push to SQ and wait; Offset = 1 in TimeoutOp.

The offset parameter is completed when a CQ of $offset is completed, or when a timeout occurs.

This command will register a timeout operation. The addr field must contain a pointer to a struct timespec64 structure, len must contain 1 to signify one timespec64 structure, timeout_flags may contain IORING_TIMEOUT_ABS for an absolute timeout value, or 0 for a relative timeout. off may contain a completion event count. A timeout will trigger a wakeup event on the completion ring for anyone waiting for events. A timeout condition is met when either the specified timeout expires, or the specified number of events have completed. Either condition will trigger the event. If set to 0, completed events are not counted, which effectively acts like a timer. io_uring timeouts use the CLOCK_MONOTONIC clock source. The request will complete with -ETIME if the timeout got completed through expiration of the timer, or 0 if the timeout got completed through requests completing on their own. If the timeout was cancelled before it expired, the request will complete with -ECANCELED. Available since 5.4.

This allows you to push SQ before each wait. The advantage is that you do not need to remove SQ (because it will be consumed on each return). And it’s easy to implement, there’s no need to maintain the USER_data field of the Op to push TimeoutRemoveOp.

Plan 3

Similar to scheme 2, except that offset in TimeoutOp is set to 0.

This is tricky to implement: offset = 0 means that it is a pure timer, independent of the number of CQ completions, and only completes when the actual timeout occurs. This means that we need to push TimeoutRemoveOp or bear the false wake up cost (Glommio implements a similar scheme and its cost chooses the latter).

discuss

When inserting a TimeoutOp, we should do it as late as possible because it may be canceled. So option 1 would insert 2 TimeoutOp and 2 TiemoutRemoveOp times before wait 0->1->0->1. This is not necessary.

Scenarios 2 and 3 have the same execution timing as Tokio and Glommio in the EPOLL scenario. The details are different:

  • In option 2, when any CQ is completed, TimeoutOp should be completed, so there is no need to Remove, i.e. no need to maintain user_data. It will be very simple to implement and save TimeoutRemoveOp and kernel processing overhead.
  • The advantage of scheme 3 compared with scheme 2 is that when the wait times are large, scheme 2 pushes a TimeoutOp each time, while Scheme 3 can check whether the TimeoutOp is consumed, saving some push times. Of course, there is a downside to scheme 2: push TimeoutRemove in when the timeout is cancelled.

In our actual business scenario, time events are mostly used as timeouts, and a few are used for timed polling. Timeout scenarios tend to be registration and removal timeouts, and real timeouts are not hot paths: our tentative decision here is to use scenario 2. At the same time, the implementation of scheme 2 is simple, and the cost of subsequent optimization is small.

Cross-thread communication

Thread per Core runtime, but without the ability to communicate across threads, you can’t do a lot of things. For example, the common case is that a single thread pulls the configuration and distributes it to the Thread local of each thread.

If you just expect cross-thread communication, you don’t need any Runtime support. This can be done either using lock-free data structures or cross-thread locking.

But we want to integrate at the Runtime level. For example, thread A has A channel Rx, thread B has A tx, we send data through B, A can await on Rx. The implementation difficulty here is that the REACTOR on thread A may have fallen into the pending CQ state of the kernel, and we need to wake up its thread when the task is awakened.

Ability to Unpark

So we need to add an additional Unpark interface to the Driver trait to actively wake up across threads.

Under epoll, tokio’s internal implementation registers an EventFD. Because Tokio’s scheduling model relies on cross-thread wake-up, it will hang an EventFD on ePoll regardless of whether you use some of the Sync data structures provided by Tokio; However, our implementation subject does not depend on this, only when the channel we implement is used. Therefore, we insert relevant code through conditional compilation and enable the “sync” feature to achieve zero cost as far as possible.

How do I insert Eventfd under ITOURING? Similar to what we did with park_timeout in the Time Driver, we can simply push a ReadOp to read 8 bytes, and fd is the fd of eventFD. Eventfd reads and writes are 8 bytes (U64).

Note: There are two syscall flags mentioned in the documentation (IORING_REGISTER_EVENTFD, IORING_REGISTER_EVENTFD_ASYNC) that do not do this. Ref: unixism.net/loti/ref-io…

At what point do we need to push eventFD? We can internally maintain a status flag to indicate whether an EventFD already exists in the current ring. Before sleep, if it exists, then sleep, and if it doesn’t, push one in and mark it exists.

When CQ is consumed, the userdata corresponding to eventFD is marked as nonexistent so that it will be reinserted before the next sleep.

When we need the unpark thread, we simply take its eventFD, write 1U64 to it, and the FD becomes readable, triggering the ring to return from Syscall.

We maintain UnparkHandle in a global Map so that each thread can wake up other threads. At thread creation time, we register our Weak reference to UnparkHandle globally.

When we need to wake up across threads, we simply grab the UnparkHandle from the global Map, try upgrade, and write data. To reduce access to the entire Map, we cache this Map in each thread.

By referring to the implementation of Eventfd, there is a lock inside the kernel, and on the other hand, it ensures that the 8-byte u64 is written in one go, so there is no out-of-order problem. So the current implementation is to go directly to libc::write. (Wow so unsafe!)

Integrated Waker

In pure native threads, our wake up logic looks like this:

  1. We want to wait for a future to execute in this thread
  2. Since the event source is URING, we store the Task’s Waker in the storage area associated with the OP when the Future is polled
  3. Uring produces an event that awakens Waker
  4. When Waker executes, it pushes the task back into the execution queue of the local thread

Under Uring Driver, our event source is Uring, so Uring is responsible for storing and waking Up Waker; Under the Time Driver, our event source is the Time Wheel, so it is also responsible for storing and waking up Waker.

Now our event source is another thread. In the case of Oneshot Channel, when rX poll, waker needs to be stored in the shared storage area of the channel. After a TX send, you need to fetch the Waker from the channel shared store and wake it up. Instead of mindlessly adding tasks to local queues, Waker’s wake up logic needs to be scheduled to queues in its thread.

So we need to add a shared_queue for each Executor to share the remote pushed Waker. When a non-local waker is woken, it adds itself to the queue of the target thread.

Another reference implementation in Glommio:

The scheme mentioned above is to pass Waker across threads, which can support channel, mutex and other data structures in general.

In addition, waker can be added to the data structure of the thread during poll without passing the waker. Then, after the sending end has data, waker is not directly woken up at the receiving end, but at the thread where it resides, and the peer thread polls all channels with Waker.

This polling approach is not efficient in some scenarios, and the scheme is not universal.

The Executor design

Executors under Thread per Core should be very simple:

  1. Just make a Queue and push tasks from one end and consume tasks from the other
  2. When there are no tasks to do, get stuck in Syscall and wait for at least one task to complete
  3. After completing the tasks, process them one by one, apply the syscall results to buffer, and wake corresponding tasks.

This logic may indeed be true under epoll; But some additional optimizations may be possible during the IO -uring.

Low latency or reduced syscall

After we push SQE, we can submit() on the spot to complete Syscall as quickly as possible, reducing latency; You can also wait until there is nothing to do and submit_and_wait(1). Considering the highest possible performance, we chose the second option (Glommio and Tokio-uring) – the test data reflect that the latency is actually not high and is sometimes lower than the Glommio delay due to lower CPU utilization. There are also dynamic ways to decide whether to submit more aggressively when the load is relatively low.

hunger

In this case, the hunger problem is often caused by the user writing the problem code. Consider the following scenarios:

  1. User’s Future every timepollSpawn a new task and return Ready.
  2. The user’s Future will wake immediately every time.
  3. There are too many state transitions in the user’s Future, or there is an endless loop of state transitions.

If we choose to process IO (including commit, wait, and harvest) when there are no more tasks, then none of the io-dependent tasks can be processed in these scenarios because the task queue is never empty or the task is never finished.

For questions 1 and 2, we propose that instead of completing all tasks, we set an execution ceiling and force a commit and harvest when the ceiling is reached.

For question 3, a COOp mechanism similar to Tokio can be established to limit the number of recursive poll execution to achieve the purpose of limiting state transition.

Make an advertisement

Monoio is still in a very imperfect stage and I look forward to your contributions 🙂

In addition, we have also built a mirror of crates. IO and Rustup in China. Welcome to use RsProxy!