I participated in the fourth middleware Performance contest held by Ali in June and learned a lot, so I often pay attention to the program design contest held by Ali in Tianchi. At the end of September, I noticed this session of POLARDB database performance contest and signed up early. The warm-up competition will start on October 25th, and the preliminary competition will officially start on November 5th and end on November 19th. The preliminary competition will be over by the time THIS article is published. Since I had been looking for an internship before November, I did not make any preparation. I started to write the first version of the code around November 10th and gave up trying on the evening of November 18th. The final preliminary result is 42, the time is 240.69 seconds, and the big guys are still a big gap. Writing this blog, mainly want to share the ideas of this period of competition, and gradually improve the experience bit by bit. Making: github.com/AlexZFX/eng… Currently only ️ has updated the code for the preliminary round and will continue to do so after the final round.

Subject to introduce

PolarDB as a combination of hardware and software on behalf of the full use of new hardware, dry the bonus of hardware for users to obtain the ultimate data performance, which in PolarDB design, we use Optane SSD as all hot data write buffer, through the kernel bypass way, Achieving extreme performance. Therefore, the background of this competition is Optane SSD, on which the participants explore the realization of an efficient KV storage engine

The above is the official background of the competition provided by Ali Cloud, and the specific topics are as follows: To achieve a simplified and efficient KV storage engine, supporting Write and Read interfaces.

Program profiling logic Profiling is divided into two stages:


1) Recover correctness evaluation At this stage, the evaluation program will concurrently write specific data (key 8B, value)


4KB) execute any kill simultaneously


-9 to simulate an unexpected exit of the process (the competing engine needs to ensure that data persistence is not lost in the event of an unexpected exit of the process), and then reopen DB, call the Read interface for correctness verification

2) Performance evaluation

  • Random Write: 64 threads Write random data concurrently. Each thread uses Write to Write random data 1 million times (key 8B, value 4KB).
  • Random Read: 64 threads Read random data concurrently, and each thread reads random data for 1 million times using Read. Note: In phase 2.2, the kv check of all reads will be checked to see whether they match. If they fail to pass, the check will be terminated and evaluation will fail

Generally speaking, we can get the following requirements and information:

  1. Realize the core logic of a KV database, mainly for open, read, write three interfaces.
  2. Support multi-threaded concurrent read and write.
  3. Ensure that the data is not lost if the method returns successfully (kill guarantees that the data is not lost if the method has returned correctly; otherwise, it is not considered lost).
  4. The length of the key and value is determined. The key is 8B and the value is 4KB.
  5. You can use either the Java language or C++, the Java memory limit is 3G, and the C++ memory limit is 2G.
  6. The disk usage does not exceed 320 GB

Writing process

The complete competition process took about a week, during which I made a lot of attempts, and the result changed from more than 900s in the first run to 240s in the end. Next, I will elaborate on the ideas and progress of each edition. (Key values in the following headings indicate the number of files used respectively)

General train of thought

So let’s do some simple math, key + offset = ( 8 + 8 ) * 64000000 / 1024 / 1024 = 977M value = 4096 * 64000000 / 1024 / 1024 / 1024 = 245G The limitations of visible disk and memory are relatively insignificant and sufficient for sound design.

Because key is an 8B byte array, converting to a long number is easy and useful for the rest of the calculation. So the keys discussed below are based on longs. The idea of the original body is this

  • Use HashMap to maintain all key-offset pairs in memory. When the database is open, load key and offset in the file. When the database is read, it only needs to find the offset of the corresponding key and read it in the corresponding value file. The selection of hashmap here is a very important thing, because Java’s own Hashmap is the storage of objects, so a Long KV pair will occupy about 40B of memory, in this case, the memory of 3G will burst, and finally choose HPPC open source basic type hashmap. The reason for the selection mainly refers to the group’s article “JMH test the performance of large HashMap”, he also wrote some summary and analysis about this competition, recommend everyone to pay attention to and learn, THERE is a big gap between me and the big guy.
  • The initial plan is to hash the key, and then store the value with the same hash result of the key in the same file. The key and the offset corresponding to the value are stored in the same file.
  • All keys and values are appended to the end of the file, regardless of whether they are duplicated. In this way, when loading, the following keys will inevitably overwrite the existing keys, so you do not need to consider the key duplication problem.
  • To ensure that data is not lost when a process is killed, do not cache or write keys or values asynchronously. Otherwise, the verification phase may fail. When the write interface is invoked, data is directly dropped.

The first FileChannel reads and writes 1 key + 128 value 381.79s

The first thing I wanted to do was to write all the keys in one file. At the beginning, I ignored a small point and wrote the key and offset separately into the file, which resulted in some mismatches between keys and values. The problem is that writing keys and offsets will cause threading problems, which may cause data that should be in the form of KeyValueKeyValue to be written as keyValuevalue. Therefore, synchronized is directly added after error. I got the score of 968s for the first time, and soon modified this simple small problem to get a significantly improved score of 381.79s. At this time, the code is mainly like this.

@Override public void open(String path) throws EngineException { File file = new File(path); // create directory if (! file.exists()) { if (! File.mkdir ()) {throw new EngineException(RetCodeEnum.IO_ERROR, "failed to create file directory:" + path); } else {logger.info(" create file directory successfully: "+ path); } // create FILE_COUNT FileChannel in order to write RandomAccessFile RandomAccessFile; if (file.isDirectory()) { for (int i = 0; i < FILE_COUNT; i++) { try { randomAccessFile = new RandomAccessFile(path + File.separator + i + ".data", "rw"); FileChannel channel = randomAccessFile.getChannel(); fileChannels[i] = channel; [I] = new AtomicLong(randomAccessFile.length()); } catch (IOException e) { e.printStackTrace(); }}} else {throw new EngineException(RetCodeEnum.IO_ERROR, "path is not a directory "); } File keyFile = new File(path + File.separator + "key"); if (! keyFile.exists()) { try { keyFile.createNewFile(); } catch (IOException e) { e.printStackTrace(); } // Create hashMap try {randomAccessFile = new randomAccessFile (keyFile, "rw"); keyFileChannel = randomAccessFile.getChannel(); ByteBuffer keyBuffer = ByteBuffer.allocate(KEY_LEN); ByteBuffer offBuffer = ByteBuffer.allocate(KEY_LEN); keyFileOffset = new AtomicLong(randomAccessFile.length()); long temp = 0, maxOff = keyFileOffset.get(); while (temp < maxOff) { keyBuffer.position(0); keyFileChannel.read(keyBuffer, temp); temp += KEY_LEN; offBuffer.position(0); keyFileChannel.read(offBuffer, temp); temp += KEY_LEN; keyBuffer.position(0); offBuffer.position(0); keyMap.put(keyBuffer.getLong(), offBuffer.getLong()); } } catch (IOException e) { e.printStackTrace(); } } @Override public void write(byte[] key, byte[] value) throws EngineException { long numkey = Util.bytes2long(key); int hash = hash(numkey); long off = offsets[hash].getAndAdd(VALUE_LEN); keyMap.put(numkey, off + 1); Localkey.get ().putLong(0, numkey).putLong(8, off + 1); localKey.get().position(0); keyFileChannel.write(localKey.get(), keyFileOffset.getAndAdd(KEY_AND_OFF_LEN)); // Write value to buffer localBufferValue.get().position(0); localBufferValue.get().put(value, 0, VALUE_LEN); //buffer writes to the file localBufferValue.get().position(0); fileChannels[hash].write(localBufferValue.get(), off); } catch (IOException e) {throw new EngineException(RetCodeEnum.IO_ERROR); } } @Override public byte[] read(byte[] key) throws EngineException { long numkey = Util.bytes2long(key); int hash = hash(numkey); Long off = keymap. get(numkey); long off = keymap. get(numkey); If (off == 0) {throw new EngineException(RetCodeEnum.NOT_FOUND, numkey + "not found "); } try { localBufferValue.get().position(0); fileChannels[hash].read(localBufferValue.get(), off - 1); } catch (IOException e) {throw new EngineException(RetCodeEnum.IO_ERROR); } localBufferValue.get().position(0); localBufferValue.get().get(localByteValue.get(), 0, VALUE_LEN); return localByteValue.get(); }Copy the code

  • Both localKey and localBufferValue are directBytebuffers of ThreadLocal (some of the differences between HeapByteBuffer and Direct will be discussed later). This parameter is used to write as a FileChannel (FileChannel parameters are initialized at open time), avoiding the need to allocate a new block of memory for each write.
  • The Hash method used here is an am& with 0x7F, and the result is divided into 128 value files. (This Hash is simple and efficient, but it may cause a file to be very large if the distribution of keys is not uniform, which will be mentioned later.)

At this time, the open time is nearly 90s, which is obviously a result beyond the acceptable range. So this part was quickly optimized.

FileChannel 64 Thread Open 64 Key 128 Value 260.96s

The open was too long, so that became a big focus for us, and we made a lot of changes during that time, and the process went like this.

  1. A single key file, a single map, and a complete offset divided into 64 copies with no result were actually overworked before the result was achieved, because the local tests kept failing. The reason we immediately thought was that when the single key file was initialized concurrently, An equal key does not necessarily overwrite the previous key, so the value may not be correct. So the solution is that all the same keys must be read in strict order.
  2. 64 key files, single map 301.49s Because of the above reasons, we chose to hash the key as well. The hash result divided the key into 64 different key files. The result is that the same key must be written in the same file in sequence. So the read must be strictly ordered. This version also passed a small number of local tests, thought there was no problem, but online failed, then we began to pay attention to the thread safety of HPPC map itself, put locked map commit, sure enough, passed, score 301.49s. A quick look at the source code shows that it is thread unsafe, which prompted us to make the next map change.
  3. Therefore, we split the map into 64 hashmaps directly according to the number of key files. The difference is that the capacity of our map cannot be determined by such splitting. After a simple online log test, we completed the version of 64map. After the concurrency problem was solved, the score of this version improved quite a bit, 260.96s. At this time, the open has been pressed within 10s, but there is still room for improvement.

The main changes in this release are in the open area, and the open method is posted below.

@Override public void open(String path) throws EngineException { File file = new File(path); // create directory if (! file.exists()) { if (! File.mkdir ()) {throw new EngineException(RetCodeEnum.IO_ERROR, "failed to create file directory:" + path); } else {logger.info(" create file directory successfully: "+ path); } } RandomAccessFile randomAccessFile; If (file.isDirectory()) {try {// Build keyFileChannel and initialize map for (int I = 0; i < THREAD_NUM; i++) { randomAccessFile = new RandomAccessFile(path + File.separator + i + ".key", "rw"); FileChannel channel = randomAccessFile.getChannel(); keyFileChannels[i] = channel; keyOffsets[i] = new AtomicInteger((int) randomAccessFile.length()); } ExecutorService executor = Executors.newFixedThreadPool(THREAD_NUM); CountDownLatch countDownLatch = new CountDownLatch(THREAD_NUM); for (int i = 0; i < THREAD_NUM; i++) { if (! (keyOffsets[i].get() == 0)) { final long off = keyOffsets[i].get(); final int finalI = i; executor.execute(() -> { int start = 0; long key; int keyHash; while (start < off) { try { localKey.get().position(0); keyFileChannels[finalI].read(localKey.get(), start); start += KEY_AND_OFF_LEN; localKey.get().position(0); key = localKey.get().getLong(); keyHash = keyFileHash(key); keyMap[keyHash].put(key, localKey.get().getInt()); } catch (IOException e) { e.printStackTrace(); } } countDownLatch.countDown(); }); } else { countDownLatch.countDown(); } } countDownLatch.await(); executor.shutdownNow(); } catch (IOException | InterruptedException e) { e.printStackTrace(); } // create FILE_COUNT FileChannel for write sequence for (int I = 0; i < FILE_COUNT; i++) { try { randomAccessFile = new RandomAccessFile(path + File.separator + i + ".data", "rw"); FileChannel channel = randomAccessFile.getChannel(); fileChannels[i] = channel; ValueOffsets [I] = new AtomicInteger((int) (randomAccessfile.length () >>> SHIFT_NUM)); } catch (IOException e) { e.printStackTrace(); }}} else {throw new EngineException(RetCodeEnum.IO_ERROR, "path is not a directory "); }}Copy the code

We also did some little things in this section

  • By adding logs to the close method, we counted the number of keys and values on the line. We found that the keys and values were very uniform, and each key and value file was of similar size.
  • The advantage of replacing offset with an int and doing a 12-bit shift each time it is read from the value file is that it saves some memory that can be used for other purposes.

Open, 64key 64Value 245.18s

I have been thinking about using Mmap, which in Java corresponds to MappedByteBuffer, because I am not sure whether Mmap can guarantee the integrity of the data in the case of the process being killed by kill -9. Meanwhile, if all the data are written using Mmap, It makes it impossible for me to determine the size of the file (pre-specified for mmap) and to append to the file from the specified location after kill. So I’m going to take it one step at a time, and then finally think about using this.

However, there must be no risk in using Mmap to read open files, so I made another change to open. At this time, there were still 64 key files and 128 value files, and the running score was 248.58. The open process was compressed within 1s, about 600ms. We are basically satisfied with this open speed.

This was later changed to 64 value files, where the key and value files were hash only once at a time, and the read and write speed seemed to improve slightly at 245.18s.

The open code is as follows

@Override public void open(String path) throws EngineException { File file = new File(path); // create directory if (! file.exists()) { if (! File.mkdir ()) {throw new EngineException(RetCodeEnum.IO_ERROR, "failed to create file directory:" + path); } else {logger.info(" create file directory successfully: "+ path); } } RandomAccessFile randomAccessFile; If (file.isDirectory()) {try {// Build keyFileChannel and initialize map for (int I = 0; i < THREAD_NUM; i++) { randomAccessFile = new RandomAccessFile(path + File.separator + i + ".key", "rw"); FileChannel channel = randomAccessFile.getChannel(); keyFileChannels[i] = channel; keyOffsets[i] = new AtomicInteger((int) randomAccessFile.length()); } ExecutorService executor = Executors.newFixedThreadPool(THREAD_NUM); CountDownLatch countDownLatch = new CountDownLatch(THREAD_NUM); for (int i = 0; i < THREAD_NUM; i++) { if (! (keyOffsets[i].get() == 0)) { final long off = keyOffsets[i].get(); final int finalI = i; executor.execute(() -> { int start = 0; try { MappedByteBuffer mappedByteBuffer = keyFileChannels[finalI].map(FileChannel.MapMode.READ_ONLY, 0, off); while (start < off) { start += KEY_AND_OFF_LEN; keyMap[finalI].put(mappedByteBuffer.getLong(), mappedByteBuffer.getInt()); } unmap(mappedByteBuffer); countDownLatch.countDown(); } catch (IOException e) { e.printStackTrace(); }}); } else { countDownLatch.countDown(); } } countDownLatch.await(); executor.shutdownNow(); } catch (IOException | InterruptedException e) { e.printStackTrace(); } // create FILE_COUNT FileChannel for write sequence for (int I = 0; i < FILE_COUNT; i++) { try { randomAccessFile = new RandomAccessFile(path + File.separator + i + ".data", "rw"); FileChannel channel = randomAccessFile.getChannel(); fileChannels[i] = channel; ValueOffsets [I] = new AtomicInteger((int) (randomAccessfile.length () >>> SHIFT_NUM)); } catch (IOException e) { e.printStackTrace(); }}} else {throw new EngineException(RetCodeEnum.IO_ERROR, "path is not a directory "); }}Copy the code

We also found some problems in this edition and read a lot of articles. The main conclusions are as follows:

  • I met OOM once in the process of Open, but this phenomenon is not expected to occur according to my JVM parameters. Meanwhile, I have tested reading values of 256 files using Mmap during this period. However, the MappedByteBuffer (Java’s MappedByteBuffer is limited to two gigabytes of memory at a time) burst. Through the online log test, it was found that a large number of keys with the same hash value were written at the beginning of the evaluation, and the value was also larger than 2G. However, this part of the evaluation is not actually reflected in the official log, and this discovery also provides a little help for the final change.
  • I read the summary and share of the Thousand-yard game and learned some mMAP content. I know that when the program exits abnormally, even if the Mmap memory data does not fall to disk, the kernel will write back to disk after your process is killed. As long as the server does not actually power down, the data security is guaranteed, which is very helpful.

Fourth edition MMAP read/write key, FileChannel read/write value, 64 + 64 240.69s

With the final findings of version 3, we’re going to make a few more changes to the way keys are written by fileChannel to mmap writes. The file size of mmap is set to a slightly larger value, and the offset written after open is determined by the size of the value file (Valuelen / 4096 * 12). This optimization brings about 2-3s improvement.

In addition, a simple JVM tuning was done to change the ratio of the new generation to the old generation from 1:1 to 6:1, which resulted in a performance improvement of about 2s.

Finally, I will post the complete code directly below. If you are interested in the whole process, you can also clone it on github.

package com.alibabacloud.polar_race.engine.common;

import com.alibabacloud.polar_race.engine.common.exceptions.EngineException;
import com.alibabacloud.polar_race.engine.common.exceptions.RetCodeEnum;
import com.carrotsearch.hppc.LongIntHashMap;
import io.netty.util.concurrent.FastThreadLocal;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

public class EngineRace extends AbstractEngine {

    private static Logger logger = LoggerFactory.getLogger(EngineRace.class);
    // key+offset 长度 16B
    private static final int KEY_AND_OFF_LEN = 12;
    // 线程数量
    private static final int THREAD_NUM = 64;
    // value 长度 4K
    private static final int VALUE_LEN = 4096;
    //每个map存储的key数量
    private static final int PER_MAP_COUNT = 1024000;

    private static final int SHIFT_NUM = 12;
    //    存放 value 的文件数量 128
    private static final int FILE_COUNT = 64;

    private static final int HASH_VALUE = 0x3F;

    private static final LongIntHashMap[] keyMap = new LongIntHashMap[THREAD_NUM];

    static {
        for (int i = 0; i < THREAD_NUM; i++) {
            keyMap[i] = new LongIntHashMap(PER_MAP_COUNT, 0.98);
        }
    }

    //key 文件的fileChannel
    private static FileChannel[] keyFileChannels = new FileChannel[THREAD_NUM];

    private static AtomicInteger[] keyOffsets = new AtomicInteger[THREAD_NUM];

    private static MappedByteBuffer[] keyMappedByteBuffers = new MappedByteBuffer[THREAD_NUM];

    //value 文件的fileChannel
    private static FileChannel[] fileChannels = new FileChannel[FILE_COUNT];

    private static AtomicInteger[] valueOffsets = new AtomicInteger[FILE_COUNT];

    private static FastThreadLocal<ByteBuffer> localBufferValue = new FastThreadLocal<ByteBuffer>() {
        @Override
        protected ByteBuffer initialValue() throws Exception {
            return ByteBuffer.allocate(VALUE_LEN);
        }
    };

    @Override
    public void open(String path) throws EngineException {
        File file = new File(path);
        // 创建目录
        if (!file.exists()) {
            if (!file.mkdir()) {
                throw new EngineException(RetCodeEnum.IO_ERROR, "创建文件目录失败:" + path);
            } else {
                logger.info("创建文件目录成功:" + path);
            }
        }
        RandomAccessFile randomAccessFile;
        // file是一个目录时进行接下来的操作
        if (file.isDirectory()) {
            try {
                //先 创建 FILE_COUNT个FileChannel 供write顺序写入,并由此文件获取value文件的大小
                for (int i = 0; i < FILE_COUNT; i++) {
                    try {
                        randomAccessFile = new RandomAccessFile(path + File.separator + i + ".data", "rw");
                        FileChannel channel = randomAccessFile.getChannel();
                        fileChannels[i] = channel;
                        // 从 length处直接写入
                        valueOffsets[i] = new AtomicInteger((int) (randomAccessFile.length() >>> SHIFT_NUM));
                        keyOffsets[i] = new AtomicInteger(valueOffsets[i].get() * KEY_AND_OFF_LEN);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                //先构建keyFileChannel 和 初始化 map
                for (int i = 0; i < THREAD_NUM; i++) {
                    randomAccessFile = new RandomAccessFile(path + File.separator + i + ".key", "rw");
                    FileChannel channel = randomAccessFile.getChannel();
                    keyFileChannels[i] = channel;
                    keyMappedByteBuffers[i] = channel.map(FileChannel.MapMode.READ_WRITE, 0, PER_MAP_COUNT * 20);
                }
                CountDownLatch countDownLatch = new CountDownLatch(THREAD_NUM);
                for (int i = 0; i < THREAD_NUM; i++) {
                    if (!(keyOffsets[i].get() == 0)) {
                        final long off = keyOffsets[i].get();
                        final int finalI = i;
                        final MappedByteBuffer buffer = keyMappedByteBuffers[i];
                        new Thread(() -> {
                            int start = 0;
                            while (start < off) {
                                start += KEY_AND_OFF_LEN;
                                keyMap[finalI].put(buffer.getLong(), buffer.getInt());
                            }
                            countDownLatch.countDown();
                        }).start();
                    } else {
                        countDownLatch.countDown();
                    }
                }
                countDownLatch.await();
            } catch (IOException | InterruptedException e) {
                e.printStackTrace();
            }
        } else {
            throw new EngineException(RetCodeEnum.IO_ERROR, "path不是一个目录");
        }
    }

    @Override
    public void write(byte[] key, byte[] value) throws EngineException {
        long numkey = Util.bytes2long(key);
        int hash = valueFileHash(numkey);
        int off = valueOffsets[hash].getAndIncrement();
        try {
            ByteBuffer keyBuffer = keyMappedByteBuffers[hash].slice();
            keyBuffer.position(keyOffsets[hash].getAndAdd(KEY_AND_OFF_LEN));
            keyBuffer.putLong(numkey).putInt(off);
            //将value写入buffer
            ByteBuffer valueBuffer = localBufferValue.get();
            valueBuffer.clear();
            valueBuffer.put(value);
            valueBuffer.flip();
            fileChannels[hash].write(valueBuffer, ((long) off) << SHIFT_NUM);
        } catch (IOException e) {
            throw new EngineException(RetCodeEnum.IO_ERROR, "写入数据出错");
        }
    }


    @Override
    public byte[] read(byte[] key) throws EngineException {
        long numkey = Util.bytes2long(key);
        int hash = valueFileHash(numkey);
        long off = keyMap[hash].getOrDefault(numkey, -1);
        ByteBuffer buffer = localBufferValue.get();
        if (off == -1) {
            throw new EngineException(RetCodeEnum.NOT_FOUND, numkey + "不存在");
        }
        try {
            buffer.clear();
            fileChannels[hash].read(buffer, off << SHIFT_NUM);
        } catch (IOException e) {
            throw new EngineException(RetCodeEnum.IO_ERROR, "读取数据出错");
        }
        return buffer.array();
    }

    @Override
    public void range(byte[] lower, byte[] upper, AbstractVisitor visitor) throws EngineException {
    }

    @Override
    public void close() {
        for (int i = 0; i < FILE_COUNT; i++) {
            try {
                keyFileChannels[i].close();
                fileChannels[i].close();
            } catch (IOException e) {
                logger.error("close error");
            }
        }
    }

    private static int valueFileHash(long key) {
        return (int) (key & HASH_VALUE);
    }
}
Copy the code

This version of the code is a little different from the previous version:

  • Here, the ByteBuffer used for reading and writing value is HeapByteBuffer. The original DirectByteBuffer was used because it is faster to write value to external memory. However, writing byte[] values to the buffer does not avoid copying them out of the heap. When I looked at the FileChannel write method source code, I found that the FileChannel write method is based on DirectByteBuffer, which itself maintains a cache of out-of-heap memory. After testing the FileChannel write method, I found that the performance of the FileChannel write method is similar to that of the FileChannel write method
  • The slice method is called when mmap writes the key, in order to retrieve a slice of the MappedByteBuffer, which is non-thread-safe and is essentially implemented internally by calling the Unsafe putByte.

conclusion

There was a little bit of middleware performance match in the preliminary match when I wrote it, but I still learned a lot of knowledge compared to it. I also tried using unsafe to implement a memory-copying feature, but it didn’t seem to work as well. I felt like I was using it incorrectly. I put the code in the Unsafe branch of Github, where you can check it out if you want.

Ongoing is heats, compared with the preliminaries for added a full quantity order traversal of demand, more difficult, and more interesting, will feel more test is part of the design, the next will use Java continue to attend, if something, will be to write a blog for the corresponding summary.