The 20-day semifinal finally came to an end. The international convention is to say the result first. The result of the semifinal is not ideal, once dropped from the 10th to the final 36th, mainly because the optimization card was written for 5 days, there has been no progress, and the final ranking is frozen in the second page of the leaderboard. After reviewing the pain, this article lists the knowledge learned in the rematch, the successful optimization and the unsuccessful optimization.

The problem is introduced

Implement an in-process queuing engine in Java or C++ that can support more than 1 million queues on a single server.

public abstract class QueueStore {
    abstract void put(String queueName, byte[] message);
    abstract Collection<byte[]> get(String queueName, long offset, long num);
}
Copy the code

Write an implementation of the above interface.

The put method writes a message to a queue. This interface needs to be thread safe, and the benchmark program calls this interface concurrently to put it. The contents of each queue store messages in the order they were sent (think of it as a List in Java), and each message has an index starting from 0. QueueName indicates the name of the queue, and Message indicates the content of the message. Content is generated randomly during evaluation. Most messages are about 58 bytes in length, and a few messages are about 1K in length.

The get method reads a batch of messages from a queue in the order they are sent. This interface needs to be thread safe, that is, the benchmark program will call this interface concurrently to get, and the returned Collection will be concurrently read, but no write is involved, so it only needs to be thread read safe. If there are enough messages, num is returned. Otherwise, only the existing messages are returned. If there are not enough messages, an empty set is returned.

Introduction to the Evaluation Program

  1. Sending phase: the message size is about 58 bytes, the number of messages is about 2 billion, that is, the total number of sent data is about 100G, and the total number of queue columns is 100w
  2. Index verification: Random verification is performed on the indexes of all queues. On average, each queue is checked 1 or 2 times. (Random consumption)
  3. Sequential consumption phase: 20% of the queue is selected for all reading and verification; (Sequential consumption)
  4. The maximum sending duration cannot exceed the 1800s. The maximum time of index verification and sequential consumption can be no more than 1800s. A timeout is considered a profile failure.
  5. The number of threads in each phase ranges from 20 to 30

The test environment was 4C8G ECS, and the maximum JVM size was limited to 4GB(-xmx 4G). Carry a 300 GB SSD disk. For Java players, the amount of memory available can be interpreted as: 4 GB out of the heap and 4 gb in the heap.

The problem analysis

The interface description is very simple. There is only one put and one get method. It is necessary to pay special attention to the benchmark program. In the sending phase, 100W queue is required. The amount of each sent is only 58 bytes, and the final total amount of data is 100G. Both the index checksum and the sequential consumption phases are called GET interfaces. The difference is that the former index checksum is a random consumption, while the latter is a full sequential consumption starting from index 0 for 20% of the queue. The characteristics of the benchmark program are critical to the final storage design.

One of the difficulties of the final competition is the design of the single million queue, according to the data

  • A single Kafka server has more than 64 queues/partitions. The number of Kafka partitions should not be too large
  • RocketMQ supports up to 50,000 queues per single server

As for the million-queue usage scenario, you can only think of IOT scenarios where there is such a need. Compared with the preliminary competition, the design of the semifinal is more uncertain, and the top contestants may choose different designs.

The investigation points of the rematch mainly include the following aspects: disk block read and write, read and write buffer, sequential read and write and random read and write, pageCache, sparse index, queue storage design, etc.

The final result was 126W TPS, while the TPS of the first tier reached 200 W +. In view of this, I do not want to summarize the preliminary contest in accordance with the optimization process, but to do their own pre-research, as well as design ideas to share with you, do not understand the document IO readers can also take this article as a popular science to read the article.

Train of thought,

Determine the file read and write mode

As a loyal Java fan, it was natural to use Java as the language of choice for the competition, and even though the final ranking was dominated by the Cpp masters, they had no choice but to throw Cpp aside after graduation. File read/write interfaces in Java can be roughly divided into three categories:

  1. IO: FileInputStream; FileOuputStream: FileOuputStream
  2. NIO read and write, in the java.nio package, related classes: FileChannel, ByteBuffer
  3. Mmap Memory map, located in the java.nio package, related classes: FileChannel, MappedByteBuffer

Standard IO reading and writing is not of research value, so it passes directly. Therefore, NIO and Mmap become the first research object.

In the first stage, Mmap was investigated. Search around and almost all the articles agree that memory mapping techniques such as Mmap are the fastest. For those of you who are not familiar with memory mapping technology, Mmap maps files directly to user mode memory addresses, so that operations on files are no longer write/read, but directly to memory addresses.

public void test1(a) throws Exception {
    String dir = "/Users/kirito/data/";
    ensureDirOK(dir);
    RandomAccessFile memoryMappedFile;
    int size = 1 * 1024 * 1024;
    try {
        memoryMappedFile = new RandomAccessFile(dir + "testMmap.txt"."rw");
        MappedByteBuffer mappedByteBuffer = memoryMappedFile.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, size);
        for (int i = 0; i < 100000; i++) {
            mappedByteBuffer.position(i * 4);
            mappedByteBuffer.putInt(i);
        }
        memoryMappedFile.close();
    } catch(Exception e) { e.printStackTrace(); }}Copy the code

The above code shows the simplest way to use Mmap, and the speed is just one word: fast! I was skeptical to find more evidence, good source code is always the first reference, looking at RocketMQ design, you can see NIO and Mmap in the source code, but more read and write operations seem to favor Mmap. RocketMQ source org. Apache. RocketMQ. Store. MappedFile in two writing methods exist at the same time, consult the @ originality probably come to the conclusion: after zero RocketMQ main writing is done through Mmap.

However, there are two problems when using Mmap as a writing scheme, which expose the limitations of Mmap simply from the perspective of use:

  1. Mmap in Java can only map 1.5 to 2 gb of file memory at a time, but in fact our data files are more than 100 GB, which leads to the first problem: either you need to physically split the files into multiple files; Either you need to do a logical split of the file mapping, large file segmentation mapping. RocketMQ limits the single file size to avoid this problem.

  1. Mmap is fast because it is accelerated by memory. The put behavior of the mappedByteBuffer is actually a memory operation. The actual flush behavior depends on the operating system’s periodic flush or manual call to the mappedByteBuffer.force() interface. Otherwise, the machine will be stuck (the conclusion of the test). The use of Mmap is difficult to control due to the limited memory in the rematch environment.

After such a process and data collection, it is finally determined that Mmap has advantages in scenarios where memory is relatively abundant and data volume is small (most articles conclude that Mmap is suitable for reading and writing large files, which is not a rigorous conclusion in my opinion).

The second phase was to investigate Nio’s FileChannel, which was my final read-write solution.

Since each message is only about 58 bytes, writing directly through FileChannel is bound to be a bottleneck. In fact, if you do that, you won’t even be able to score in the rematch. Another claim is that the smallest write to an SSD is in 4K, and if a write is below 4K, it actually takes the same amount of time. Here involves an important test point: block reading and writing.

According to the introduction of SSD cloud disks on aliyun, only 16kb to 64kb can be written to obtain the ideal IOPS. The characteristics of the file system block storage inspired us to set up a memory write buffer, a single message is written to the memory buffer, the buffer is full, and the FileChannel is used to flush the disk. In practice, using FileChannel with a buffer provides the same write performance as Mmap with full memory, and FileChannel has no limit on file size and relatively simple control, so I decided to use FileChannel for reading and writing.

Determine the storage structure and index structure

Due to the message queue is the background of the problem, review the random testing and 3 phase, the phase 2 order consumption will read several successive messages at a time, and the order of the three phase consumer consumption from 0 index in the queue until the last message, these factors have inspired us: should be the same message queue as far as possible put together. In the previous section, we mentioned write buffers, which fit well with the design here. For example, we can set one write buffer per queue (Java has 4g of out of heap memory, 100W of queue, a queue allocates 4K of out of heap memory using DirectByteBuffer, This ensures that the buffer will not burst memory), so that the same buffer messages together to disk, to ensure the order of the messages in the block, that is, “the same queue messages as possible to store together”. Accessing messages in blocks currently appears to have two advantages:

  1. Read messages by block => Read messages by block to take advantage of block read and reduce I/O times
  2. Full index => Sparse index. The data in a block is continuous, so you only need to record the physical file offset of the block + the number of messages in the block to calculate the physical location of a message. This greatly reduces the number of indexes. A little calculation shows that it is possible to maintain the index of queue blocks in memory using a Map data structure with Key queueName and Value List. If you follow the traditional design of one index file per queue, millions of files will inevitably exceed the default system file handle limit. The index is stored in memory to avoid the problem of the number of file handles, and the speed does not have to be large. File IO and memory IO are not on the same order of magnitude.

Since the problem specifies that the message body is not a fixed length, most messages are 58 bytes, and a few messages are 1K bytes, so the structure of short+byte[] is used to store the message body. Short records the actual length of the message, and byte[] records the complete message body. Short is 2 bytes less than int, 2*2 billion messages, can reduce the amount of data 4g.

Dense index is used to index the full amount of messages, which is applicable to unordered messages. The number of indexes is large, and the data can be accessed by one.

Sparse index is suitable for the messages stored in blocks. It is ordered within blocks. It is suitable for ordered messages.

Sparse indexes are ideal for this scenario because of the sequential storage and sequential consumption of message queues, as well as the limitation of SSD cloud disk’s minimum access unit of 4K (much larger than a single message). As for the data file, can be made into parameters, according to the actual test to determine whether the effect of multiple files is better, or single file, this scheme supports 100G single file.

Memory read/write buffer

In the design of sparse index, we mentioned the concept of write buffer. According to the calculation, if a queue of 100W is allocated a write buffer, it can only be allocated 4k at most, which happens to be the smallest SSD write block size (however, according to the data given by SSD cloud disk before, Write 64K at a time to full IO).

Write 4k at a time, which results in a block size of 4K in the physical file and 4k at a time when the same read is taken out.

// Write the buffer
private ByteBuffer writeBuffer = ByteBuffer.allocateDirect(4 * 1024);
// Use short to record the message length
private final static int SINGLE_MESSAGE_SIZE = 2;

public void put(String queueName,byte[] message){
    // The buffer is full
    if (SINGLE_MESSAGE_SIZE + message.length  > writeBuffer.remaining()) {
        / / tray
        flush();
    }
    writeBuffer.putInt(SINGLE_MESSAGE_SIZE);
    writeBuffer.put(message);
    this.blockLength++;
}
Copy the code

For parts less than 4K, you can choose to add 0 or skip. The profiler guarantees that writes at the queue level are synchronized, so we can’t worry about synchronization on the same queue. After the write is done, the same logic is done for the read. Since get operations are concurrent, phases 2 and 3 can have 10 to 30 threads consuming the same queue concurrently, the read buffer for get operations can be designed as ThreadLocal

, which clears each time. This ensures that the buffer is fresh every time it is read, and reduces the creation of read buffers, otherwise frequent FULL GC will occur. The read pseudocode will not be posted for the time being because such a GET scheme is not the final one.

Here the overall design architecture has come out, and the main logic of the write process and read process is as follows:

Write process:

Read process:

Memory read cache optimization

After several overturns and retries, the above architecture is determined. The advantage of this architecture is that it is very simple and clear. In fact, the code amount of my first version of the design is 2~3 times that of the above, but the actual effect is not ideal. The above architecture can achieve a score of 70~ 80W TPS, which can only be counted as the third tier. On this basis, the read cache optimization can reach 126W TPS. Before I get to read cache optimization, let me introduce the concept of PageCache.

The Linux kernel caches the most recently accessed file pages in memory for a period of time. This file cache is called PageCache. This is shown in the figure above. The normal read() operation occurs between the application-provided buffer and the PageCache. The prefetch algorithm is responsible for populating the PageCache. Applications typically have a small read cache. For example, the file copy command cp has a read/write granularity of 4KB. The kernel’s prefetch algorithm prereads I/O at whatever size it considers more appropriate, such as 16-128KB.

So in general we think that sequential reads are faster than random reads, and PageCache is the biggest contributor.

Back to the question, this is nice, because the same queue of data on disk is partially continuous (the same block is continuous), in fact, a 4KB block can store more than 70 data, and in the sequential consumption phase, an offset is usually 10, with the PageCache prefetch mechanism, 7 file IO can be reduced to 1! This is a great optimization, but the above architecture is only 70~ 80W TPS, which made me confused. After many times of searching for information, I finally located the problem under the reminder of @Jiang Xuelei.

There are two possible causes of not being able to use pageCache for caching during a match

  1. Since I’m using FIleChannel to read and write, NIO’s reads and writes probably go through Direct IO, so they don’t go through the PageCache layer at all.
  2. In the test environment, memory is limited and The PageCache effect is minimal under IO – intensive conditions.

Although I am not sure exactly what causes the PageCache to be unavailable, my storage scheme still satisfies the feature of sequential reading, and I can use the out-of-heap memory to simulate a “PageCache” by myself, so that the TPS will have a very high improvement when the 3 stages of sequential consumption.

One read buffer per queue is used for sequential reads, and to make the GET phase concurrency free, I chose to reuse the read buffer and put a queue-level lock on the GET operation, which is a small sacrifice because phase 2 does not conflict and phase 3 does not have a large probability of conflict. The modified read cache scheme is as follows:

After the cache modification, you can also use Direct IO to achieve similar optimization to PageCache, but with more control and less frequent page missing interrupts. With this optimization, plus some GC optimization, 126W TPS was achieved. The overall scheme is described.

Other optimization

There are some optimizations that have little impact on the overall process and are singled out for separate introduction.

A different strategy can be adopted for phase 2 random index detection and phase 3 sequential consumption, where the required data can be read directly without caching (because it is random detection, the read cache will definitely not be hit).

Make the number of files as a parameter and adjust the parameter to determine whether the TPS is high for multiple files or single file. In fact, after testing, it is found that the difference is not very big, and the effect of single file is slightly better. Because it is an SSD cloud disk and there is no magnetic head, I really don’t understand the principle.

Gc optimizes not to use lists where arrays can be used. You can use arrays to manage basic data types. Small objects are very unfriendly to GC. Java is still one garbage collector behind Cpp, both in the preliminary and the final. Ensure that full GC does not occur in the whole process.

Failure to optimize and reflect

This competition left a big regret, because the optimization of write was not done well. After the read cache was completed, the total time of my stage 2 and stage 3 was 400+s, which was a good result, but the writing time was 1300+s. I used the multi-threaded synchronous flush scheme, but also tried the following writing scheme:

  1. Asynchronous commit write buffer, single thread direct flush
  2. Asynchronous commit write buffer, set the secondary buffer 64K ~64M, single-thread use secondary buffer flush disk
  3. Synchronously copy the data from the write buffer to a LockFreeQueue, and use it in a single line to play the full IOPS
  4. Every 16 queues share a write buffer, so that the write buffer can be controlled up to 64K, and the data in the same queue can be sorted when the disk is flushed.

However, they all ended in failure and did not get the gist of writing optimization, which is the biggest regret of this competition.

There is also a mistake is that the evaluation environment used cloud disk SSD and MY local Mac UNDER the SSD storage structure gap is too big, plus some Mac OS and Linux gap, resulting in the local success of the optimization online completely can not be reflected, or rent an Ali cloud environment is more reliable.

On the other hand, I was not familiar with the storage and MQ architecture design. Some of the optimizations I made for Kafka and RocketMQ were learned and applied. I was not sure if they were right, which led to some detours. In contrast, I admire the depth and breadth of middleware knowledge and there is still a lot to learn.

The competing comprehension

The first feeling is tired, the second feeling is cool. I believe that many contestants are working like me, they work during the day, so they can only spare time for the competition at night. It is really unfriendly to me of 966. The time of the preliminary contest was extended once, which was a relief. The Netty in the preliminary contest and the storage design in the final are all unforgettable memories. I also met many friends in the contest, including students and workers. Thank you for your tireless teaching and thought-provoking discussion. It’s true that you can learn from different people what you don’t know.

According to news, ali middleware competition is likely to be the last, whether it is for what reason, as a participant, I feel deep regret, I hope we can have a chance to take part in the next contest of middleware, is also looking forward to seeing more of the same type of event held by the big Internet companies, and the bosses to compete, meet more new friends feel good.

Although I failed to make the final, I still expect the 11 finalists to bring a wonderful defense, so as to answer my failed writing plan. In the future, I will consider absorbing the optimization ideas of the top several JAVA, and organize them into the final and perfect scheme. Current proposal git address, warehouse has publicly: code.aliyun.com/250577914/q…

Welcome to follow my wechat official account: “Kirito technology sharing”, any questions about the article will be answered, bring more Java related technology sharing.