The ant group CeresDB team | Rust CPU Affinity

Ruihang Xia/Post Editor: Zhang Handong

Brief

When I read Apache Cassandra, I learned that ScyllaDB’s performance can be greatly improved when it is fully compatible with it. After further understanding the architecture of Thread per Core, this article started from a simple cache structure and realized three different schemes. They are compared and some things learned in the process are given at last.

Simply speaking, Thread Per Core is to bind each Thread of the application to a computing Core, and disassemble the computing to the corresponding Core by Sharding. This is a shared nothing approach, in which each core holds the data needed to perform calculations independently, thus avoiding redundant thread synchronization overhead. At the same time, each core and worker thread one by one, reducing the overhead of context switching.

In Waynexia /shard-affinity, I use ordinary unrestricted scheduling, local set to group computing tasks, bind tasks, and core and thread to achieve the same purpose of cache structure. These three implementations correspond to the threading-rs, local_set-rs, and affinity-rs files in the shard-affinity/load/ SRC directory respectively. These three methods will be analyzed next. The original code mentioned below is in this repository, partially omitted for brevity.

Cache

Let’s say we have a structure like Map

that caches the Data we need. Requests can be either append() or get() to it, thread synchronization via read/write locks, internal variability, and external exposure to &self’s interface.
,>

pub struct CacheCell {
    items: RwLock<Map<Id, RwLock<Item>>>,
}

impl CacheCell {
    pub fn get(&self, id: Id, size: usize) - >Option<Bytes>{}
    pub fn append(&self, id: usize, bytes: Bytes) {}
}
Copy the code

First, to allow multiple tasks to operate the cache simultaneously and still get the desired results, we can use the lock-free structure, or serialize the concurrent operations by adding a lock to it. We found that operations on different ids did not affect each other. Therefore, the granularity of the structure affected by thread synchronization can be reduced. Taking gorilla In-memory data structure referenced by this cache as an example, ids can be divided into groups and managed by corresponding cells. Reduce the granularity of locks to support higher concurrent operations.

Fig.7: Gorilla in-memory data structure

This structure is chosen for two reasons. First, it is a structure used in a real production system, which makes practical sense. And it is relatively simple and easy to implement, and the ID itself has been sharding, convenient for subsequent use.

Threading

The Vec< cache >> is routed to the cache based on the ID of each request.

impl ThreadingLoad{
    pub fn append(&self, id: Id, bytes: Bytes) {
        self.shards[route_id(id)].append(id, bytes); }}Copy the code

When used, it opens a multithreaded Tokio Runtime that requests spawn with different ids.

let rt = Builder::new_multi_thread().build();
let load = ThreadingLoad::new();

rt.spawn(async move {
    let id = random::<usize> (); load.append(id, bytes); })Copy the code

After that, It’s up to Tokio to schedule the task and give us the result when it’s done, and we don’t care about how the task is scheduled or which core the computation is on. And all of our bottom structures pay the price of sending and Sync, without having to worry about weird results when multiple things operate on one object at a time.

LocalSet

This is done using Tokio’s LocalSet. It can bind the specified task to the same thread for execution. The good thing about this is that we can use it! Send’s stuff.

Specifically, we know from the above that operations between different ids do not affect each other, so we can reduce the lock granularity. Similarly, the data needed for task calculation with different ids will not overlap, that is, the scenario in which a piece of data may be accessed by multiple kernels at the same time is avoided, so there is no need to consider the visibility of our changes to other kernels. Based on this, the performance cost of implementing Send and Sync for data can also be saved. For example, reference counts can be changed from Arc to Rc, or all instruction barriers added to ensure visibility can be removed.

From the implementation point of view, on my device with 16 logic cores, all shards are divided into 15 threads for management, and another one for task distribution. There is a channel between the task distribution thread and other threads for task transmission. There are two types of tasks distributed here:

enum Task {
    Append(Id, Bytes, oneshot::Sender<()>),
    Get(Id, usize, oneshot::Sender<()>),
}
Copy the code

Each contains the parameters required by the task and a channel to notify the completion of the task. Each time the request arrives, the task dispatch thread assembles the required parameters, sends them to the corresponding execution thread based on the ID, and waits for the execution result.

pub async fn append(&self, id: Id, bytes: Bytes) {
    let (tx, rx) = oneshot::channel();
    let task = Task::Append(id, bytes, tx);
    self.txs[route_id(id)].send(task).unwrap();

    rx.await.unwrap()
}
Copy the code

Affinity

In the above implementation, we simply tied together the data and calculations needed for a set of tasks, avoiding the overhead of thread synchronization. Significant operating system scheduling behavior can be observed when the load is not balanced between the cores during operation. This reduces only one of the two overhead mentioned at the beginning; the overhead of context switching remains. Operating system scheduling often does not understand the behavior of the application, so in the third approach we bind each thread to the core, or tell the operating system how to schedule our thread.

Thread allocation is the same as in the LocalSet scenario above, where shards are allocated to all but one of the distributing threads and each thread is bound with a core. CPU affinity is set to core_affinity crate.

let core_ids = core_affinity::get_core_ids().unwrap();
core_affinity::set_for_current(_);
Copy the code
for core_id in core_ids {
    thread::spawn(move || {
        core_affinity::set_for_current(core_id);
    });
}
Copy the code

In addition to setting CPU affinity, there are other differences from the previous scenario. First of all, the future that has been constructed is distributed in Channels instead of being constructed after distributing parameters. Second, the runtime here is a simple FIFO queue; Finally, the caches of each thread are stored as thread local storage.

self.runtime.spawn(route_id(id), async move {
    thread_local! (static SHARD:AffinityShard = AffinityShard::new() );

    SHARD.with(|shard| {
        shard.append(id, bytes);
    });

    tx.send(()).unwrap();
});
Copy the code

These differences are simply implementation differences, and because the memory inside the cache is allocated from the heap using the default allocator, TLS doesn’t really make a difference here, as we’ll return to later.

In this case, each computational thread can be reduced to a single-threaded model for consideration to some extent, and the whole system becomes a non-preemptive, cooperative scheduling, with rust coroutine being used by the task itself to await the resource with await yield. There is a lot of room for development beyond the areas mentioned earlier.

This affinity scheme is also a good scenario for NUMA practices on the application side. In combination with TLS mentioned earlier, another approach is to use a NUMA-aware memory allocator. However, my device doesn’t support NUMA, so I didn’t test it further.

Test

There are three binary code files under shard_affinity/ SRC, which is a simple test for each of the three cases. The workload parameters can be seen under shard_affinity/ SRC /lib.rs. In my environment, the time of 128 concurrent writes and 4096 concurrent reads of 16KB data is as follows: To keep the data set, set the id range to 0 to 1023.

Figure 2. Local test results. The ordinate is the delay (milliseconds), the lower the better.

As you can see, local Set and Affinity do not perform as well as threading. In the preliminary analysis, in both local set and Affinity schemes, one thread is used as the entry for task generation and distribution, which means extra cost of task routing. In the test, it can be seen that the CPU load is also high and low, and the single execution time of simulated tasks is relatively short. Routing threads also reach the bottleneck earlier.

After adjusting the number of worker threads to 8 (half the number of logical cores), you can see that the threading and affinity differences are reduced. For gaps that still exist, flamegraph analysis may be the result of Affinity’s need to send and receive requests and results for each task.

Figure 3: Results after adjusting the number of workers. The ordinate is the delay (milliseconds), the lower the better.

Sharding’s scheme is also required because all memory data, i.e. state, is pre-distributed to each core. When affinity is overloaded due to hot issues, re-balancing can be a time-consuming operation that is less flexible than threading. In addition, the distribution method of the calculation is also important, such as the current distribution method by one thread to other threads. Considering the more complex composition of the actual system computing load, the best way to distribute computing tasks also requires careful decision making.

Others

In the implementation of Affinity, most of the components are handmade in a simple model. In fact, there are many excellent frameworks for Thread per Core that can simplify the development of this architecture, such as seastar, the framework used by Scylladb mentioned at the beginning, and many of their documents have been referred to in the writing process of this article. Rust has a similar framework, Glommio, a relatively new library that just released its first official release.

In the Thread per Core architecture, in addition to the application logic needs to change, many commonly used components also need to change, the design of common multithreaded scenarios to pay the cost of thread synchronization, such as Arc can be replaced by Rc, etc., these need to be considered. And hopefully we can develop a good ecology around it.

Conclusion

After simply comparing the implementation and performance of different methods, FROM my point of view, Thread per Core is a very worthy method to try. It can simplify the scenarios considered during development to some extent. It is also suitable for the current servers with dozens or hundreds of cores, and it also has the mature practice of Scylladb. However, this is a big change for a system that has been basically formed. The improvements we expect from Thread per Core are lower latency and smoother performance through reduced synchronization overhead and improved cache hit ratios, and the gains from these changes are weighed against the increased complexity, effort, and risk.

About us

We are ants intelligent monitoring technology middle temporal storage team, we are using Rust to build high performance, low cost and has the capability of real-time analysis of a new generation of temporal database, welcome to join or recommended, at present, we are looking for outstanding intern also welcome new students to our team internship, please contact: [email protected]

Contents: Rust Chinese Collection (Rust_Magazine) march issue